iT邦幫忙

2024 iThome 鐵人賽

DAY 29
0
Kubernetes

K8s 資料庫管理系統系列 第 29

day 29 K8s串接line bot分析颱風影響資料庫管理系統

  • 分享至 

  • xImage
  •  

今天是第二十九天我們可以寫一個 K8s串接line bot分析颱風影響資料庫管理系統,以下是我的程式碼

  1. Kubernetes 部署管理: 確保整個系統可以在 K8s 上運行和擴展。
  2. LINE Bot: 讓使用者可以透過 LINE 互動,並輸入或查詢颱風數據。
  3. 資料庫: 儲存和管理颱風的影響數據。
  4. 颱風資料分析: 從各種來源獲取颱風資料,進行處理和分析。

這裡我將為你提供一個高層次的架構,並且說明如何一步一步實現這個系統。

Step 1: 建立 LINE Bot 伺服器

使用 Python 和 Flask 來建立一個 LINE Bot 伺服器,接收來自使用者的訊息,並回應分析結果。這部分我們會先實作最基本的 LINE Bot 功能。

app.py (LINE Bot 伺服器)

from flask import Flask, request, abort
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import MessageEvent, TextMessage, TextSendMessage

app = Flask(__name__)

# Line Bot API Token 和 Channel Secret
line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')

@app.route("/callback", methods=['POST'])
def callback():
    signature = request.headers['X-Line-Signature']
    body = request.get_data(as_text=True)
    
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)
    
    return 'OK'

# 處理收到的文字訊息
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
    user_message = event.message.text
    
    # 回應訊息範例,可以將此處替換為資料分析結果
    reply_message = f"你剛剛說的是: {user_message}"
    line_bot_api.reply_message(
        event.reply_token,
        TextSendMessage(text=reply_message)
    )

if __name__ == "__main__":
    app.run(port=8000)

執行說明:

  • 將這個伺服器上傳到你的 Kubernetes 叢集後,將 /callback 路徑設定為 LINE Developers 平台上的 webhook URL。
  • 記得使用 Kubernetes 的 IngressLoadBalancer 來暴露這個服務。

Step 2: 使用 Kubernetes 部署 LINE Bot

我需要將這個 LINE Bot 伺服器部署在 Kubernetes 上。

deployment.yaml (K8s 部署檔案)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: line-bot-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: line-bot
  template:
    metadata:
      labels:
        app: line-bot
    spec:
      containers:
      - name: line-bot-container
        image: your-docker-image:latest
        ports:
        - containerPort: 8000
---
apiVersion: v1
kind: Service
metadata:
  name: line-bot-service
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8000
  selector:
    app: line-bot

步驟:

  1. 將 LINE Bot 伺服器包成 Docker 映像檔並推送到 Docker registry。
  2. 使用 kubectl apply -f deployment.yaml 將其部署到 Kubernetes 叢集。
  3. 使用 Kubernetes 的 LoadBalancer 服務暴露你的 LINE Bot 給外部使用者。

Step 3: 資料庫設置

使用 MySQL 或 PostgreSQL 資料庫來儲存與颱風影響有關的數據,例如颱風路徑、風速、降雨量等。

mysql-deployment.yaml (MySQL 部署檔案)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - name: mysql
        image: mysql:5.7
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: "rootpassword"
        ports:
        - containerPort: 3306
---
apiVersion: v1
kind: Service
metadata:
  name: mysql-service
spec:
  ports:
  - port: 3306
    targetPort: 3306
  selector:
    app: mysql

這樣可以啟動 MySQL 服務,並確保你的應用程式可以連接到這個資料庫。

Step 4: 颱風數據分析

可以從氣象局 API 獲取颱風的實時數據,將其儲存到資料庫,並在用戶通過 LINE Bot 詢問時回應相關數據。

颱風數據 API 整合

這裡是一個簡單的範例,從氣象資料 API 獲取颱風數據並存入資料庫。

import requests
import mysql.connector

def fetch_typhoon_data():
    # 呼叫氣象局 API,假設這是你的 API 端點
    api_url = "https://api.typhoon.example.com/data"
    response = requests.get(api_url)
    
    if response.status_code == 200:
        typhoon_data = response.json()
        store_data_in_db(typhoon_data)

def store_data_in_db(data):
    connection = mysql.connector.connect(
        host="mysql-service",
        user="root",
        password="rootpassword",
        database="typhoon_db"
    )
    cursor = connection.cursor()
    
    for record in data['records']:
        cursor.execute("""
            INSERT INTO typhoon_info (name, wind_speed, pressure, path)
            VALUES (%s, %s, %s, %s)
        """, (record['name'], record['wind_speed'], record['pressure'], record['path']))
    
    connection.commit()
    cursor.close()
    connection.close()

Step 5: 用戶查詢颱風影響

當用戶透過 LINE Bot 查詢颱風數據時,系統會從資料庫中提取數據並回應。

@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
    user_message = event.message.text.lower()
    
    if "颱風" in user_message:
        typhoon_data = get_latest_typhoon_data()
        reply_message = f"颱風名稱: {typhoon_data['name']}, 風速: {typhoon_data['wind_speed']} km/h"
    else:
        reply_message = "請輸入有關颱風的查詢"
    
    line_bot_api.reply_message(
        event.reply_token,
        TextSendMessage(text=reply_message)
    )

def get_latest_typhoon_data():
    connection = mysql.connector.connect(
        host="mysql-service",
        user="root",
        password="rootpassword",
        database="typhoon_db"
    )
    cursor = connection.cursor(dictionary=True)
    cursor.execute("SELECT * FROM typhoon_info ORDER BY id DESC LIMIT 1")
    result = cursor.fetchone()
    
    cursor.close()
    connection.close()
    
    return result

總結

  • LINE Bot:用於與使用者互動並提供颱風數據查詢功能。
  • Kubernetes:部署整個系統,包括 LINE Bot 伺服器和資料庫。
  • 資料庫管理:儲存颱風相關數據並進行分析查詢。
  • 颱風數據來源:透過 API 獲取實時颱風數據,並將其存入資料庫以供查詢。

1. app.py (LINE Bot 伺服器)

這段程式碼是用來建立一個 Flask 伺服器,並與 LINE Messaging API 結合,處理來自使用者的訊息。

主要概念:

  • Flask:是一個 Python 的 Web 框架,用來建立 Web 伺服器。
  • LINE Messaging API:LINE 提供的 API 讓開發者可以與 LINE 使用者進行互動。

主要程式碼說明:

from flask import Flask, request, abort
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import MessageEvent, TextMessage, TextSendMessage

app = Flask(__name__)
  • Flask():這是用來建立 Flask 應用的指令。app 是我們的 Flask 伺服器實例。
  • LineBotApiWebhookHandler:這是由 LINE 提供的 API,用來處理使用者訊息和回應。
line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
  • LineBotApi:使用你的 CHANNEL_ACCESS_TOKEN 來與 LINE API 互動。這個 token 是 LINE 開發者平台上創建的 Bot 給你的授權。
  • WebhookHandler:這個 handler 用來驗證請求是否來自 LINE,並處理來自 LINE 的 webhook 事件。
@app.route("/callback", methods=['POST'])
def callback():
    signature = request.headers['X-Line-Signature']
    body = request.get_data(as_text=True)
    
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)
    
    return 'OK'
  • /callback 路徑:這個路徑是 LINE webhook 請求發送的 URL。我們在此處接收請求,並通過 handler 進行處理。
  • signature:這是為了驗證 LINE webhook 請求的有效性。每個 LINE webhook 請求都有這個 X-Line-Signature 標頭。
  • handler.handle:將 webhook 請求交給 handler,如果簽名不正確,會觸發 InvalidSignatureError,然後返回 400 錯誤。
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
    user_message = event.message.text
    
    # 回應訊息範例,可以將此處替換為資料分析結果
    reply_message = f"你剛剛說的是: {user_message}"
    line_bot_api.reply_message(
        event.reply_token,
        TextSendMessage(text=reply_message)
    )
  • @handler.add(MessageEvent, message=TextMessage):這是一個事件處理器,當 LINE 收到來自使用者的文字訊息時,就會觸發這個方法。
  • event.message.text:取得使用者發送的訊息內容。
  • reply_message:回應給使用者的訊息,這裡是簡單回應一個鏡像訊息,回覆使用者剛剛發送的文字。
  • line_bot_api.reply_message:用來回應使用者的訊息,reply_token 是來自事件的 token,TextSendMessage 是我們要回應的訊息。

2. deployment.yaml (K8s 部署檔案)

這段程式碼是用來在 Kubernetes 中部署你的 LINE Bot 伺服器。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: line-bot-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: line-bot
  template:
    metadata:
      labels:
        app: line-bot
    spec:
      containers:
      - name: line-bot-container
        image: your-docker-image:latest
        ports:
        - containerPort: 8000
  • apiVersion: apps/v1:指定 Kubernetes API 版本,這是用來管理應用程式的版本。
  • kind: Deployment:這表示我們要在 Kubernetes 中部署一個 Deployment,這是一個 Kubernetes 資源,用來確保應用有正確的版本和複本數。
  • replicas: 1:這表示應用只有 1 個副本在運行,若需要更高的可用性,可以增加副本數。
  • template:這裡定義了應用程式的容器,將使用 Docker 映像檔 your-docker-image:latest 來啟動。
  • containerPort: 8000:指定容器的內部端口。
---
apiVersion: v1
kind: Service
metadata:
  name: line-bot-service
spec:
  type: LoadBalancer
  ports:
  - port: 80
    targetPort: 8000
  selector:
    app: line-bot
  • kind: Service:這表示我們要建立一個 Kubernetes Service,用來暴露應用程式給外部訪問。
  • type: LoadBalancer:這表示我們會使用負載均衡器來暴露這個服務,這是 Kubernetes 提供的方式之一。
  • port: 80targetPort: 8000:這表示外部訪問應用的端口是 80,而內部的容器運行在 8000 端口。
  • selector: app: line-bot:這表示這個 Service 會選擇帶有 app: line-bot 標籤的 Pod。

3. mysql-deployment.yaml (MySQL 部署檔案)

這段程式碼用來在 Kubernetes 中部署 MySQL 資料庫。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - name: mysql
        image: mysql:5.7
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: "rootpassword"
        ports:
        - containerPort: 3306
  • MYSQL_ROOT_PASSWORD:設定 MySQL 資料庫的 root 密碼,這是資料庫的基本配置。
  • containerPort: 3306:這是 MySQL 預設的連接埠號。
---
apiVersion: v1
kind: Service
metadata:
  name: mysql-service
spec:
  ports:
  - port: 3306
    targetPort: 3306
  selector:
    app: mysql
  • port: 3306targetPort: 3306:暴露 MySQL 的 3306 端口,讓其他服務能夠連接到它。

4. 颱風數據的 API 整合

這段程式碼展示了如何從颱風 API 獲取數據並將其儲存到 MySQL 資料庫中。

import requests
import mysql.connector

def fetch_typhoon_data():
    api_url = "https://api.typhoon.example.com/data"
    response = requests.get(api_url)
    
    if response.status_code == 200:
        typhoon_data = response.json()
        store_data_in_db(typhoon_data)
  • requests.get(api_url):這裡我們從 API URL 獲取颱風數據。response 會包含 API 返回的數據。
  • response.json():將 API 返回的 JSON 數據轉換成 Python 字典,便於後續處理。
def store_data_in_db(data):
    connection = mysql.connector.connect(
        host="mysql-service",
        user="root",
        password="rootpassword",
        database="typhoon_db"
    )
    cursor = connection.cursor()

    for record in data['records']:
        cursor.execute("""
            INSERT INTO typhoon_info (name, wind_speed, pressure, path)
            VALUES (%s, %s, %s, %s)
        """, (record['name'], record['wind_speed'], record['pressure'], record['path']))
    
    connection.commit()
    cursor.close()
    connection.close()
  • mysql.connector.connect():連接到 MySQL 資料庫。這裡會使用 MySQL 的服務位址、root 帳號和

密碼進行連接。

  • cursor.execute():執行 SQL 插入語句,將 API 的數據存入 typhoon_info 資料表中。
  • connection.commit():提交事務,確保資料存入資料庫中。

整體流程:

  1. LINE Bot 接收來自使用者的訊息。
  2. Flask 伺服器解析這些訊息,並通過 LINE Messaging API 做出回應。
  3. 伺服器會與颱風數據的 API 互動,獲取數據並儲存在 MySQL 資料庫中。
  4. 系統使用 Kubernetes 進行部署,確保應用的可擴展性和高可用性。

上一篇
day 28 k8s 電力分配資料庫管理系統
下一篇
day 30 k8s分析社群軟體threads進而分析族群愛好資料庫管理系統
系列文
K8s 資料庫管理系統30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言