今天是第二十九天我們可以寫一個 K8s串接line bot分析颱風影響資料庫管理系統,以下是我的程式碼
這裡我將為你提供一個高層次的架構,並且說明如何一步一步實現這個系統。
使用 Python 和 Flask 來建立一個 LINE Bot 伺服器,接收來自使用者的訊息,並回應分析結果。這部分我們會先實作最基本的 LINE Bot 功能。
app.py
(LINE Bot 伺服器)from flask import Flask, request, abort
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import MessageEvent, TextMessage, TextSendMessage
app = Flask(__name__)
# Line Bot API Token 和 Channel Secret
line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
@app.route("/callback", methods=['POST'])
def callback():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
try:
handler.handle(body, signature)
except InvalidSignatureError:
abort(400)
return 'OK'
# 處理收到的文字訊息
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
user_message = event.message.text
# 回應訊息範例,可以將此處替換為資料分析結果
reply_message = f"你剛剛說的是: {user_message}"
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=reply_message)
)
if __name__ == "__main__":
app.run(port=8000)
執行說明:
/callback
路徑設定為 LINE Developers 平台上的 webhook URL。我需要將這個 LINE Bot 伺服器部署在 Kubernetes 上。
deployment.yaml
(K8s 部署檔案)apiVersion: apps/v1
kind: Deployment
metadata:
name: line-bot-deployment
spec:
replicas: 1
selector:
matchLabels:
app: line-bot
template:
metadata:
labels:
app: line-bot
spec:
containers:
- name: line-bot-container
image: your-docker-image:latest
ports:
- containerPort: 8000
---
apiVersion: v1
kind: Service
metadata:
name: line-bot-service
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8000
selector:
app: line-bot
步驟:
kubectl apply -f deployment.yaml
將其部署到 Kubernetes 叢集。使用 MySQL 或 PostgreSQL 資料庫來儲存與颱風影響有關的數據,例如颱風路徑、風速、降雨量等。
mysql-deployment.yaml
(MySQL 部署檔案)apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql-deployment
spec:
replicas: 1
selector:
matchLabels:
app: mysql
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: mysql:5.7
env:
- name: MYSQL_ROOT_PASSWORD
value: "rootpassword"
ports:
- containerPort: 3306
---
apiVersion: v1
kind: Service
metadata:
name: mysql-service
spec:
ports:
- port: 3306
targetPort: 3306
selector:
app: mysql
這樣可以啟動 MySQL 服務,並確保你的應用程式可以連接到這個資料庫。
可以從氣象局 API 獲取颱風的實時數據,將其儲存到資料庫,並在用戶通過 LINE Bot 詢問時回應相關數據。
這裡是一個簡單的範例,從氣象資料 API 獲取颱風數據並存入資料庫。
import requests
import mysql.connector
def fetch_typhoon_data():
# 呼叫氣象局 API,假設這是你的 API 端點
api_url = "https://api.typhoon.example.com/data"
response = requests.get(api_url)
if response.status_code == 200:
typhoon_data = response.json()
store_data_in_db(typhoon_data)
def store_data_in_db(data):
connection = mysql.connector.connect(
host="mysql-service",
user="root",
password="rootpassword",
database="typhoon_db"
)
cursor = connection.cursor()
for record in data['records']:
cursor.execute("""
INSERT INTO typhoon_info (name, wind_speed, pressure, path)
VALUES (%s, %s, %s, %s)
""", (record['name'], record['wind_speed'], record['pressure'], record['path']))
connection.commit()
cursor.close()
connection.close()
當用戶透過 LINE Bot 查詢颱風數據時,系統會從資料庫中提取數據並回應。
@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
user_message = event.message.text.lower()
if "颱風" in user_message:
typhoon_data = get_latest_typhoon_data()
reply_message = f"颱風名稱: {typhoon_data['name']}, 風速: {typhoon_data['wind_speed']} km/h"
else:
reply_message = "請輸入有關颱風的查詢"
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=reply_message)
)
def get_latest_typhoon_data():
connection = mysql.connector.connect(
host="mysql-service",
user="root",
password="rootpassword",
database="typhoon_db"
)
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM typhoon_info ORDER BY id DESC LIMIT 1")
result = cursor.fetchone()
cursor.close()
connection.close()
return result
app.py
(LINE Bot 伺服器)這段程式碼是用來建立一個 Flask 伺服器,並與 LINE Messaging API 結合,處理來自使用者的訊息。
from flask import Flask, request, abort
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import MessageEvent, TextMessage, TextSendMessage
app = Flask(__name__)
Flask()
:這是用來建立 Flask 應用的指令。app
是我們的 Flask 伺服器實例。LineBotApi
和 WebhookHandler
:這是由 LINE 提供的 API,用來處理使用者訊息和回應。line_bot_api = LineBotApi('YOUR_CHANNEL_ACCESS_TOKEN')
handler = WebhookHandler('YOUR_CHANNEL_SECRET')
LineBotApi
:使用你的 CHANNEL_ACCESS_TOKEN
來與 LINE API 互動。這個 token 是 LINE 開發者平台上創建的 Bot 給你的授權。WebhookHandler
:這個 handler 用來驗證請求是否來自 LINE,並處理來自 LINE 的 webhook 事件。@app.route("/callback", methods=['POST'])
def callback():
signature = request.headers['X-Line-Signature']
body = request.get_data(as_text=True)
try:
handler.handle(body, signature)
except InvalidSignatureError:
abort(400)
return 'OK'
/callback
路徑:這個路徑是 LINE webhook 請求發送的 URL。我們在此處接收請求,並通過 handler
進行處理。signature
:這是為了驗證 LINE webhook 請求的有效性。每個 LINE webhook 請求都有這個 X-Line-Signature
標頭。handler.handle
:將 webhook 請求交給 handler
,如果簽名不正確,會觸發 InvalidSignatureError
,然後返回 400 錯誤。@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
user_message = event.message.text
# 回應訊息範例,可以將此處替換為資料分析結果
reply_message = f"你剛剛說的是: {user_message}"
line_bot_api.reply_message(
event.reply_token,
TextSendMessage(text=reply_message)
)
@handler.add(MessageEvent, message=TextMessage)
:這是一個事件處理器,當 LINE 收到來自使用者的文字訊息時,就會觸發這個方法。event.message.text
:取得使用者發送的訊息內容。reply_message
:回應給使用者的訊息,這裡是簡單回應一個鏡像訊息,回覆使用者剛剛發送的文字。line_bot_api.reply_message
:用來回應使用者的訊息,reply_token
是來自事件的 token,TextSendMessage
是我們要回應的訊息。deployment.yaml
(K8s 部署檔案)這段程式碼是用來在 Kubernetes 中部署你的 LINE Bot 伺服器。
apiVersion: apps/v1
kind: Deployment
metadata:
name: line-bot-deployment
spec:
replicas: 1
selector:
matchLabels:
app: line-bot
template:
metadata:
labels:
app: line-bot
spec:
containers:
- name: line-bot-container
image: your-docker-image:latest
ports:
- containerPort: 8000
apiVersion: apps/v1
:指定 Kubernetes API 版本,這是用來管理應用程式的版本。kind: Deployment
:這表示我們要在 Kubernetes 中部署一個 Deployment,這是一個 Kubernetes 資源,用來確保應用有正確的版本和複本數。replicas: 1
:這表示應用只有 1 個副本在運行,若需要更高的可用性,可以增加副本數。template
:這裡定義了應用程式的容器,將使用 Docker 映像檔 your-docker-image:latest
來啟動。containerPort: 8000
:指定容器的內部端口。---
apiVersion: v1
kind: Service
metadata:
name: line-bot-service
spec:
type: LoadBalancer
ports:
- port: 80
targetPort: 8000
selector:
app: line-bot
kind: Service
:這表示我們要建立一個 Kubernetes Service,用來暴露應用程式給外部訪問。type: LoadBalancer
:這表示我們會使用負載均衡器來暴露這個服務,這是 Kubernetes 提供的方式之一。port: 80
和 targetPort: 8000
:這表示外部訪問應用的端口是 80,而內部的容器運行在 8000 端口。selector: app: line-bot
:這表示這個 Service 會選擇帶有 app: line-bot
標籤的 Pod。mysql-deployment.yaml
(MySQL 部署檔案)這段程式碼用來在 Kubernetes 中部署 MySQL 資料庫。
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql-deployment
spec:
replicas: 1
selector:
matchLabels:
app: mysql
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: mysql:5.7
env:
- name: MYSQL_ROOT_PASSWORD
value: "rootpassword"
ports:
- containerPort: 3306
MYSQL_ROOT_PASSWORD
:設定 MySQL 資料庫的 root 密碼,這是資料庫的基本配置。containerPort: 3306
:這是 MySQL 預設的連接埠號。---
apiVersion: v1
kind: Service
metadata:
name: mysql-service
spec:
ports:
- port: 3306
targetPort: 3306
selector:
app: mysql
port: 3306
和 targetPort: 3306
:暴露 MySQL 的 3306 端口,讓其他服務能夠連接到它。這段程式碼展示了如何從颱風 API 獲取數據並將其儲存到 MySQL 資料庫中。
import requests
import mysql.connector
def fetch_typhoon_data():
api_url = "https://api.typhoon.example.com/data"
response = requests.get(api_url)
if response.status_code == 200:
typhoon_data = response.json()
store_data_in_db(typhoon_data)
requests.get(api_url)
:這裡我們從 API URL 獲取颱風數據。response
會包含 API 返回的數據。response.json()
:將 API 返回的 JSON 數據轉換成 Python 字典,便於後續處理。def store_data_in_db(data):
connection = mysql.connector.connect(
host="mysql-service",
user="root",
password="rootpassword",
database="typhoon_db"
)
cursor = connection.cursor()
for record in data['records']:
cursor.execute("""
INSERT INTO typhoon_info (name, wind_speed, pressure, path)
VALUES (%s, %s, %s, %s)
""", (record['name'], record['wind_speed'], record['pressure'], record['path']))
connection.commit()
cursor.close()
connection.close()
mysql.connector.connect()
:連接到 MySQL 資料庫。這裡會使用 MySQL 的服務位址、root 帳號和密碼進行連接。
cursor.execute()
:執行 SQL 插入語句,將 API 的數據存入 typhoon_info
資料表中。connection.commit()
:提交事務,確保資料存入資料庫中。