在第 26 天,我們將專注於如何優化 Line Bot 的性能,使其運行更加高效、穩定,並能夠承受更高的訪問量和更快地響應用戶的請求。性能優化不僅能提升用戶體驗,還能保證 Bot 的可擴展性,特別是在面對大量用戶時。
使用非同步處理:
aiohttp 或 Quart 這樣的非同步框架來提升性能。from quart import Quart, request
import asyncio
app = Quart(__name__)
@app.route("/callback", methods=["POST"])
async def callback():
    body = await request.get_data(as_text=True)
    # 處理 Line Bot 的邏輯
    asyncio.create_task(handle_event(body))
    return "OK"
async def handle_event(body):
    # 假設有一些耗時的任務,例如調用第三方API
    await asyncio.sleep(2)
    # 處理完畢後返回
利用多線程或多進程:
workers 和 threads 配置來增加性能。gunicorn -w 4 -t 4 app:app
-w 4 表示有 4 個 worker,每個 worker 處理一個請求,-t 4 表示每個 worker 使用 4 個線程。緩存結果:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def get_weather(city):
    cached_weather = r.get(f"weather:{city}")
    if cached_weather:
        return json.loads(cached_weather)
    # 如果緩存中沒有,調用API查詢
    weather_data = call_weather_api(city)
    r.setex(f"weather:{city}", 3600, json.dumps(weather_data))  # 緩存一小時
    return weather_data
限制處理重複請求:
recent_requests = {}
@handler.add(MessageEvent, message=TextMessage)
def handle_text_message(event):
    user_id = event.source.user_id
    user_message = event.message.text
    # 如果用戶的重複查詢發生在 10 秒內,則忽略
    if user_id in recent_requests and recent_requests[user_id] == user_message:
        return
    # 處理正常請求
    recent_requests[user_id] = user_message
    line_bot_api.reply_message(event.reply_token, TextSendMessage(text="處理您的請求..."))
    # 設置延遲清理
    threading.Timer(10.0, lambda: recent_requests.pop(user_id, None)).start()
批次推送消息:
user_ids = ["USER_ID_1", "USER_ID_2", "USER_ID_3"]
line_bot_api.multicast(user_ids, TextSendMessage(text="這是批次推送的消息"))
避免阻塞操作:
import concurrent.futures
executor = concurrent.futures.ThreadPoolExecutor()
@handler.add(MessageEvent, message=TextMessage)
def handle_text_message(event):
    user_message = event.message.text
    # 使用非同步執行來查詢第三方API
    executor.submit(call_api, user_message)
    line_bot_api.reply_message(event.reply_token, TextSendMessage(text="正在處理您的請求,稍等片刻..."))
使用負載均衡器:
upstream backend {
    server server1.example.com;
    server server2.example.com;
}
server {
    listen 80;
    location / {
        proxy_pass http://backend;
    }
}
橫向擴展:
自動擴展:
垃圾回收和連接管理:
import gc
def perform_heavy_task():
    # 一些耗費大量內存的任務
    gc.collect()  # 手動進行垃圾回收,避免內存泄露
減少不必要的資料庫查詢:
from functools import lru_cache
@lru_cache(maxsize=128)
def get_user_data(user_id):
    # 模擬從資料庫中獲取用戶數據
    return {"name": "John Doe", "age": 25}