iT邦幫忙

2024 iThome 鐵人賽

DAY 20
0
Kubernetes

K8s 資料庫管理系統系列 第 20

day 20 k8s空氣品質資料庫管理系統

  • 分享至 

  • xImage
  •  

今天是第二十天我們可以寫一個k8s空氣品質資料庫管理系統,以下是我的程式碼

系統架構:

  1. 數據採集層:從空氣品質感測器或 API 收集數據。
  2. 資料庫層:儲存空氣品質數據的資料庫(如 MySQL、PostgreSQL)。
  3. API 層:提供 RESTful API,用於與前端進行溝通和數據處理。
  4. 前端層:查詢和顯示空氣品質數據的管理介面。
  5. Kubernetes 集群:管理應用的可擴展性、服務間的通訊和資源調度。

主要技術棧:

  • 後端:Flask/Django 或 FastAPI 來實現 RESTful API。
  • 資料庫:MySQL/PostgreSQL
  • 前端:React/Vue 或其他前端框架來查詢和顯示數據。
  • 容器化:Docker,以便將應用部署到 Kubernetes 上。
  • Kubernetes,用來管理應用和資料庫的服務。

步驟一:後端 API 設計

你可以使用 Python 的 FastAPI 或 Flask 來編寫後端 API,用於接收、存取空氣品質數據。以下是一個簡單的 FastAPI 範例。

from fastapi import FastAPI
from pydantic import BaseModel
import sqlite3
from datetime import datetime

app = FastAPI()

# 建立數據模型
class AirQualityData(BaseModel):
    location: str
    pm25: float
    pm10: float
    o3: float
    no2: float
    so2: float
    co: float
    timestamp: datetime

# 連接 SQLite 資料庫(可以換成 MySQL/PostgreSQL)
def get_db():
    conn = sqlite3.connect('air_quality.db')
    cursor = conn.cursor()
    return conn, cursor

@app.on_event("startup")
def startup():
    conn, cursor = get_db()
    cursor.execute('''CREATE TABLE IF NOT EXISTS air_quality
                      (location TEXT, pm25 REAL, pm10 REAL, o3 REAL, no2 REAL,
                      so2 REAL, co REAL, timestamp TEXT)''')
    conn.commit()
    conn.close()

# 上傳空氣品質數據
@app.post("/air_quality/")
def create_air_quality(data: AirQualityData):
    conn, cursor = get_db()
    cursor.execute('''INSERT INTO air_quality (location, pm25, pm10, o3, no2, so2, co, timestamp)
                      VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', 
                   (data.location, data.pm25, data.pm10, data.o3, data.no2, data.so2, data.co, data.timestamp))
    conn.commit()
    conn.close()
    return {"message": "Data inserted successfully"}

# 查詢空氣品質數據
@app.get("/air_quality/{location}")
def get_air_quality(location: str):
    conn, cursor = get_db()
    cursor.execute("SELECT * FROM air_quality WHERE location = ?", (location,))
    data = cursor.fetchall()
    conn.close()
    return {"data": data}

步驟二:Docker 容器化

將應用容器化,以便能部署在 Kubernetes 上。

  • 建立 Dockerfile:
Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
  • 建立 requirements.txt:
fastapi
uvicorn
pydantic
sqlite3
  • 使用 Docker 建立並運行容器:
docker build -t air_quality_api .
docker run -p 8000:8000 air_quality_api

步驟三:Kubernetes 部署

接下來,你可以在 Kubernetes 上進行部署。建立一個 Deployment 和 Service。

  • Deployment YAML
apiVersion: apps/v1
kind: Deployment
metadata:
  name: air-quality-deployment
spec:
  replicas: 2
  selector:
    matchLabels:
      app: air-quality
  template:
    metadata:
      labels:
        app: air-quality
    spec:
      containers:
      - name: air-quality-container
        image: air_quality_api:latest
        ports:
        - containerPort: 8000
  • Service YAML
apiVersion: v1
kind: Service
metadata:
  name: air-quality-service
spec:
  selector:
    app: air-quality
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer

部署到 Kubernetes:

kubectl apply -f deployment.yaml
kubectl apply -f service.yaml

步驟四:前端設計

可以使用 Vue 或 React 設計簡單的查詢介面,通過 API 查詢並顯示數據。

import React, { useState, useEffect } from "react";
import axios from "axios";

function AirQualityDashboard() {
  const [data, setData] = useState([]);
  const [location, setLocation] = useState("");

  const fetchData = async () => {
    const result = await axios.get(`http://<API_URL>/air_quality/${location}`);
    setData(result.data.data);
  };

  return (
    <div>
      <h1>Air Quality Dashboard</h1>
      <input
        type="text"
        value={location}
        onChange={(e) => setLocation(e.target.value)}
        placeholder="Enter location"
      />
      <button onClick={fetchData}>Get Data</button>
      <div>
        {data.map((item, index) => (
          <div key={index}>
            <p>Location: {item[0]}</p>
            <p>PM2.5: {item[1]}</p>
            <p>PM10: {item[2]}</p>
            <p>O3: {item[3]}</p>
            <p>NO2: {item[4]}</p>
            <p>SO2: {item[5]}</p>
            <p>CO: {item[6]}</p>
            <p>Timestamp: {item[7]}</p>
          </div>
        ))}
      </div>
    </div>
  );
}

export default AirQualityDashboard;

1. 後端程式碼解釋 (使用 FastAPI)

這段程式碼使用 FastAPI 框架來處理空氣品質數據的 REST API,包含數據的上傳和查詢功能。

from fastapi import FastAPI
from pydantic import BaseModel
import sqlite3
from datetime import datetime
  • FastAPI 是一個用於建立快速 Web API 的框架。
  • Pydantic 是用來驗證 API 請求中的數據。
  • sqlite3 是 Python 自帶的資料庫模組,用於本地儲存數據。
  • datetime 用來處理時間戳。

1.1 數據模型定義

class AirQualityData(BaseModel):
    location: str
    pm25: float
    pm10: float
    o3: float
    no2: float
    so2: float
    co: float
    timestamp: datetime
  • BaseModel 是 Pydantic 提供的類,用來驗證 API 請求的資料格式。這裡定義了一個 AirQualityData 類,描述了每次上傳的空氣品質數據,包含不同的污染物濃度和時間戳。

1.2 資料庫連接函數

def get_db():
    conn = sqlite3.connect('air_quality.db')
    cursor = conn.cursor()
    return conn, cursor

這個函數用來建立與 SQLite 資料庫的連接,並返回資料庫連接物件和資料庫操作的游標。

1.3 應用啟動時創建表格

@app.on_event("startup")
def startup():
    conn, cursor = get_db()
    cursor.execute('''CREATE TABLE IF NOT EXISTS air_quality
                      (location TEXT, pm25 REAL, pm10 REAL, o3 REAL, no2 REAL,
                      so2 REAL, co REAL, timestamp TEXT)''')
    conn.commit()
    conn.close()
  • @app.on_event("startup"):FastAPI 提供的事件掛鉤,這個函數會在應用啟動時執行。
  • CREATE TABLE IF NOT EXISTS:建立資料庫中的 air_quality 表格,儲存不同污染物的數據和時間戳。這個表只會在不存在時創建。

1.4 數據上傳 API

@app.post("/air_quality/")
def create_air_quality(data: AirQualityData):
    conn, cursor = get_db()
    cursor.execute('''INSERT INTO air_quality (location, pm25, pm10, o3, no2, so2, co, timestamp)
                      VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', 
                   (data.location, data.pm25, data.pm10, data.o3, data.no2, data.so2, data.co, data.timestamp))
    conn.commit()
    conn.close()
    return {"message": "Data inserted successfully"}
  • @app.post("/air_quality/"):這是一個 HTTP POST 路由,用來接收空氣品質數據。
  • data: AirQualityData:這個參數代表來自用戶的 JSON 數據,Pydantic 會自動驗證其格式。
  • cursor.execute(...):這一行插入數據到資料庫中。插入的數據來自 POST 請求的數據。

1.5 數據查詢 API

@app.get("/air_quality/{location}")
def get_air_quality(location: str):
    conn, cursor = get_db()
    cursor.execute("SELECT * FROM air_quality WHERE location = ?", (location,))
    data = cursor.fetchall()
    conn.close()
    return {"data": data}
  • @app.get("/air_quality/{location}"):這是一個 GET 路由,用來查詢某個地點的空氣品質數據。
  • location: str:用戶請求時的地點參數。
  • cursor.execute(...):查詢資料庫中對應地點的數據,並返回數據作為回應。

2. Docker 容器化解釋

要在 Kubernetes 上部署應用,首先需要將它容器化,這樣能使應用環境一致並便於管理。

2.1 Dockerfile

Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
  • FROM python:3.9-slim:這裡選擇了 Python 3.9 的精簡版作為基礎鏡像。
  • WORKDIR /app:設定容器內的工作目錄為 /app
  • COPY requirements.txt:將當前目錄中的 requirements.txt 複製到容器中。
  • RUN pip install -r requirements.txt:在容器中安裝程式所需的依賴包。
  • COPY . .:將當前目錄中的所有檔案複製到容器內 /app 目錄。
  • CMD ["uvicorn", ...]:這是啟動容器時執行的命令,啟動 FastAPI 應用,並指定應用在 8000 端口運行。

2.2 requirements.txt

fastapi
uvicorn
pydantic
sqlite3

這裡列出了應用所需的 Python 套件。

2.3 使用 Docker 建立容器

  • docker build -t air_quality_api .:這會根據 Dockerfile 建立一個名為 air_quality_api 的映像檔。
  • docker run -p 8000:8000 air_quality_api:這會啟動該容器,並將本地的 8000 端口映射到容器內的 8000 端口。

3. Kubernetes 部署解釋

這部分展示如何將容器化的應用部署到 Kubernetes 上。

3.1 Deployment YAML

apiVersion: apps/v1
kind: Deployment
metadata:
  name: air-quality-deployment
spec:
  replicas: 2
  selector:
    matchLabels:
      app: air-quality
  template:
    metadata:
      labels:
        app: air-quality
    spec:
      containers:
      - name: air-quality-container
        image: air_quality_api:latest
        ports:
        - containerPort: 8000
  • replicas: 2:設定兩個副本,表示會有兩個應用容器執行,增加應用的可用性。
  • image: air_quality_api:latest:使用 air_quality_api 的 Docker 映像檔來建立容器。
  • containerPort: 8000:容器內的應用運行在 8000 端口。

3.2 Service YAML

apiVersion: v1
kind: Service
metadata:
  name: air-quality-service
spec:
  selector:
    app: air-quality
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8000
  type: LoadBalancer
  • type: LoadBalancer:這會建立一個負載均衡器,將外部的請求分配給後端的多個應用副本。
  • targetPort: 8000:這表示容器內的應用端口是 8000,而外部訪問的端口是 80。

4. 前端查詢介面解釋

這是簡單的前端程式碼,使用 React 建立一個介面來查詢空氣品質數據。

import React, { useState, useEffect } from "react";
import axios from "axios";

function AirQualityDashboard() {
  const [data, setData] = useState([]);
  const [location, setLocation] = useState("");

  const fetchData = async () => {
    const result = await axios.get(`http://<API_URL>/air_quality/${location}`);
    setData(result.data.data);
  };

  return (
    <div>
      <h1>Air Quality Dashboard</h1>
      <input
        type="text"
        value={location}
        onChange={(e) => setLocation(e.target.value)}
        placeholder="Enter location"
      />
      <button onClick={fetchData}>Get Data</button>
      <div>
        {data.map((item, index) => (
          <div key={index}>
            <p>Location: {item[0]}</p>
            <p>PM2.5: {item[1]}</

p>
            <p>PM10: {item[2]}</p>
            <p>O3: {item[3]}</p>
            <p>NO2: {item[4]}</p>
            <p>SO2: {item[5]}</p>
            <p>CO: {item[6]}</p>
            <p>Timestamp: {item[7]}</p>
          </div>
        ))}
      </div>
    </div>
  );
}

export default AirQualityDashboard;

總結

這個應用由 FastAPI 後端、Docker 容器化、Kubernetes 部署和簡單的 React 前端組成。後端提供空氣品質數據的儲存和查詢功能,容器化允許應用在不同環境中保持一致性,而 Kubernetes 提供了高可用性和可擴展性。


上一篇
day 19 k8s交通分析資料庫管理系統
下一篇
day 21 K8s 微服務客戶喜好資料庫管理系統
系列文
K8s 資料庫管理系統30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言