在第 26 天,我們將專注於如何優化 Line Bot 的性能,使其運行更加高效、穩定,並能夠承受更高的訪問量和更快地響應用戶的請求。性能優化不僅能提升用戶體驗,還能保證 Bot 的可擴展性,特別是在面對大量用戶時。
使用非同步處理:
aiohttp
或 Quart
這樣的非同步框架來提升性能。from quart import Quart, request
import asyncio
app = Quart(__name__)
@app.route("/callback", methods=["POST"])
async def callback():
body = await request.get_data(as_text=True)
# 處理 Line Bot 的邏輯
asyncio.create_task(handle_event(body))
return "OK"
async def handle_event(body):
# 假設有一些耗時的任務,例如調用第三方API
await asyncio.sleep(2)
# 處理完畢後返回
利用多線程或多進程:
workers
和 threads
配置來增加性能。gunicorn -w 4 -t 4 app:app
-w 4
表示有 4 個 worker,每個 worker 處理一個請求,-t 4
表示每個 worker 使用 4 個線程。緩存結果:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def get_weather(city):
cached_weather = r.get(f"weather:{city}")
if cached_weather:
return json.loads(cached_weather)
# 如果緩存中沒有,調用API查詢
weather_data = call_weather_api(city)
r.setex(f"weather:{city}", 3600, json.dumps(weather_data)) # 緩存一小時
return weather_data
限制處理重複請求:
recent_requests = {}
@handler.add(MessageEvent, message=TextMessage)
def handle_text_message(event):
user_id = event.source.user_id
user_message = event.message.text
# 如果用戶的重複查詢發生在 10 秒內,則忽略
if user_id in recent_requests and recent_requests[user_id] == user_message:
return
# 處理正常請求
recent_requests[user_id] = user_message
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="處理您的請求..."))
# 設置延遲清理
threading.Timer(10.0, lambda: recent_requests.pop(user_id, None)).start()
批次推送消息:
user_ids = ["USER_ID_1", "USER_ID_2", "USER_ID_3"]
line_bot_api.multicast(user_ids, TextSendMessage(text="這是批次推送的消息"))
避免阻塞操作:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor()
@handler.add(MessageEvent, message=TextMessage)
def handle_text_message(event):
user_message = event.message.text
# 使用非同步執行來查詢第三方API
executor.submit(call_api, user_message)
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="正在處理您的請求,稍等片刻..."))
使用負載均衡器:
upstream backend {
server server1.example.com;
server server2.example.com;
}
server {
listen 80;
location / {
proxy_pass http://backend;
}
}
橫向擴展:
自動擴展:
垃圾回收和連接管理:
import gc
def perform_heavy_task():
# 一些耗費大量內存的任務
gc.collect() # 手動進行垃圾回收,避免內存泄露
減少不必要的資料庫查詢:
from functools import lru_cache
@lru_cache(maxsize=128)
def get_user_data(user_id):
# 模擬從資料庫中獲取用戶數據
return {"name": "John Doe", "age": 25}