iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0

今天我們要學習如何在 FastAPI 專案中實作 API 監控 middleware。FastAPI 作為現代 Python Web 框架,提供了高效能的異步處理能力和靈活的 middleware 機制,非常適合實作高效率的監控解決方案。

FastAPI Middleware 基礎

FastAPI 的 middleware 基於 ASGI(Asynchronous Server Gateway Interface),支援異步處理,這讓我們能夠實作非阻塞的監控功能。

Middleware 類型

FastAPI 提供多種 middleware 實作方式:

  1. 函數式 Middleware:簡單的函數包裝器
  2. 類別式 Middleware:更複雜的邏輯處理
  3. 內建 Middleware:CORS、GZip、Trusted Host 等

異步優勢

# 同步處理(阻塞)
def sync_middleware():
    start_time = time.time()
    # 處理請求
    response = process_request()
    # 記錄日誌(阻塞 I/O)
    log_to_loki(data)  # 阻塞主執行緒
    return response

# 異步處理(非阻塞)
async def async_middleware():
    start_time = time.time()
    # 處理請求
    response = await process_request()
    # 非阻塞日誌記錄
    asyncio.create_task(log_to_loki(data))  # 不阻塞
    return response

監控架構設計

核心組件

  1. 異步 Loki 客戶端:非阻塞日誌推送
  2. 效能指標收集器:請求時間、記憶體使用等
  3. 錯誤追蹤器:異常捕獲和分析
  4. 批次處理器:優化推送效率

監控指標

  • 請求指標:方法、路徑、參數、標頭
  • 回應指標:狀態碼、大小、處理時間
  • 系統指標:CPU、記憶體、協程數量
  • 業務指標:使用者 ID、操作類型、業務結果

實作步驟

1. 專案設定

# 建立專案目錄
mkdir fastapi-api-monitoring
cd fastapi-api-monitoring

# 建立虛擬環境
python -m venv venv
source venv/bin/activate  # Linux/Mac

# 安裝依賴
pip install fastapi uvicorn aiohttp pydantic python-decouple psutil

2. 異步 Loki 客戶端

# monitoring/async_loki_client.py
import json
import time
import asyncio
import aiohttp
from datetime import datetime
from typing import Dict, Any, Optional
import logging

logger = logging.getLogger(__name__)

class AsyncLokiClient:
    def __init__(self, url: str = "http://localhost:3100", enabled: bool = True):
        self.url = url
        self.enabled = enabled
        self.push_url = f"{url}/loki/api/v1/push"
        self.session: Optional[aiohttp.ClientSession] = None
        self.connector = aiohttp.TCPConnector(
            limit=10,  # 最大連線池大小
            limit_per_host=5,  # 每個主機最大連線數
            keepalive_timeout=30,
            enable_cleanup_closed=True
        )
    
    async def __aenter__(self):
        """異步上下文管理器進入"""
        if self.session is None or self.session.closed:
            self.session = aiohttp.ClientSession(
                connector=self.connector,
                timeout=aiohttp.ClientTimeout(total=2)
            )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """異步上下文管理器退出"""
        if self.session and not self.session.closed:
            await self.session.close()
    
    async def push_log(self, message: str, labels: Dict[str, str] = None, level: str = "info") -> bool:
        """異步推送日誌到 Loki"""
        if not self.enabled:
            return True
            
        if labels is None:
            labels = {}
        
        # 預設標籤
        default_labels = {
            "service": "fastapi-api",
            "level": level,
            "environment": "development"
        }
        default_labels.update(labels)
        
        # 準備日誌資料
        timestamp = str(int(time.time() * 1000000000))
        
        log_entry = {
            "streams": [
                {
                    "stream": default_labels,
                    "values": [
                        [
                            timestamp,
                            json.dumps({
                                "timestamp": datetime.utcnow().isoformat() + "Z",
                                "message": message,
                                "level": level,
                                **labels
                            }, ensure_ascii=False)
                        ]
                    ]
                }
            ]
        }
        
        try:
            if self.session is None or self.session.closed:
                await self.__aenter__()
            
            async with self.session.post(
                self.push_url,
                headers={"Content-Type": "application/json"},
                data=json.dumps(log_entry)
            ) as response:
                return response.status == 204
                
        except asyncio.TimeoutError:
            logger.warning("Loki push timeout")
            return False
        except Exception as e:
            logger.error(f"Failed to push log to Loki: {e}")
            return False
    
    async def push_api_log(self, request_data: Dict[str, Any], 
                          response_data: Dict[str, Any], 
                          performance_data: Dict[str, Any]) -> bool:
        """推送 API 請求日誌"""
        labels = {
            "method": request_data.get("method", ""),
            "endpoint": request_data.get("path", ""),
            "status_code": str(response_data.get("status_code", 0)),
            "user_id": str(request_data.get("user_id", "anonymous"))
        }
        
        # 根據狀態碼決定日誌等級
        status_code = response_data.get("status_code", 500)
        if status_code >= 500:
            level = "error"
        elif status_code >= 400:
            level = "warning"
        else:
            level = "info"
        
        message = {
            "request": request_data,
            "response": response_data,
            "performance": performance_data
        }
        
        return await self.push_log(
            message=json.dumps(message, ensure_ascii=False),
            labels=labels,
            level=level
        )

# 全域異步客戶端
loki_client = AsyncLokiClient()

3. 效能指標收集器

# monitoring/metrics.py
import time
import psutil
import asyncio
from typing import Dict, Any
import sys
import gc

class PerformanceCollector:
    def __init__(self):
        self.process = psutil.Process()
        self.start_time = time.time()
    
    def get_system_metrics(self) -> Dict[str, Any]:
        """獲取系統效能指標"""
        try:
            return {
                "cpu_percent": self.process.cpu_percent(),
                "memory_rss_mb": round(self.process.memory_info().rss / 1024 / 1024, 2),
                "memory_percent": self.process.memory_percent(),
                "open_files": len(self.process.open_files()),
                "threads": self.process.num_threads(),
            }
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            return {}
    
    def get_python_metrics(self) -> Dict[str, Any]:
        """獲取 Python 運行時指標"""
        return {
            "gc_objects": len(gc.get_objects()),
            "gc_collections": sum(gc.get_stats()[i]['collections'] for i in range(3)),
            "uptime_seconds": round(time.time() - self.start_time, 2)
        }
    
    async def get_async_metrics(self) -> Dict[str, Any]:
        """獲取異步相關指標"""
        try:
            loop = asyncio.get_running_loop()
            tasks = asyncio.all_tasks(loop)
            
            return {
                "active_tasks": len(tasks),
                "pending_tasks": len([t for t in tasks if not t.done()]),
                "loop_time": getattr(loop, '_time', 0)
            }
        except RuntimeError:
            return {}
    
    async def get_all_metrics(self) -> Dict[str, Any]:
        """獲取所有效能指標"""
        metrics = {}
        metrics.update(self.get_system_metrics())
        metrics.update(self.get_python_metrics())
        metrics.update(await self.get_async_metrics())
        return metrics

# 全域效能收集器
performance_collector = PerformanceCollector()

4. FastAPI 監控 Middleware

# monitoring/middleware.py
import time
import json
import asyncio
from typing import Callable, Any, Dict
from fastapi import Request, Response
from fastapi.responses import JSONResponse
import logging
from .async_loki_client import loki_client
from .metrics import performance_collector

logger = logging.getLogger(__name__)

class LokiMonitoringMiddleware:
    def __init__(self, app, service_name: str = "fastapi-api"):
        self.app = app
        self.service_name = service_name
    
    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return
        
        # 建立 FastAPI Request 物件
        request = Request(scope, receive)
        
        # 記錄開始時間和初始指標
        start_time = time.time()
        start_metrics = await performance_collector.get_all_metrics()
        
        # 收集請求資訊
        request_data = await self._collect_request_data(request)
        
        response_data = {}
        exception_data = None
        
        async def send_wrapper(message):
            nonlocal response_data
            if message["type"] == "http.response.start":
                response_data = {
                    "status_code": message["status"],
                    "headers": dict(message.get("headers", []))
                }
            elif message["type"] == "http.response.body":
                body = message.get("body", b"")
                response_data["content_length"] = len(body)
            await send(message)
        
        try:
            # 處理請求
            await self.app(scope, receive, send_wrapper)
            
        except Exception as e:
            # 記錄異常
            exception_data = {
                "type": type(e).__name__,
                "message": str(e),
                "traceback": None  # 可以加入 traceback 資訊
            }
            response_data["status_code"] = 500
            
            # 發送錯誤回應
            error_response = JSONResponse(
                status_code=500,
                content={"error": "Internal Server Error"}
            )
            await send_wrapper({
                "type": "http.response.start",
                "status": 500,
                "headers": error_response.headers.raw
            })
            await send_wrapper({
                "type": "http.response.body",
                "body": error_response.body
            })
        
        finally:
            # 計算效能指標
            end_time = time.time()
            duration = end_time - start_time
            end_metrics = await performance_collector.get_all_metrics()
            
            performance_data = {
                "duration_ms": round(duration * 1000, 2),
                "start_metrics": start_metrics,
                "end_metrics": end_metrics
            }
            
            # 非同步記錄到 Loki
            asyncio.create_task(
                self._log_request(
                    request_data, 
                    response_data, 
                    performance_data, 
                    exception_data
                )
            )
    
    async def _collect_request_data(self, request: Request) -> Dict[str, Any]:
        """收集請求資料"""
        # 嘗試獲取請求體(需要小心處理)
        body = None
        try:
            if request.headers.get("content-type", "").startswith("application/json"):
                # 只記錄 JSON 請求體的前 1KB
                body_bytes = await request.body()
                if len(body_bytes) <= 1024:
                    body = body_bytes.decode("utf-8")
        except Exception:
            body = None
        
        return {
            "method": request.method,
            "path": request.url.path,
            "query_params": dict(request.query_params),
            "headers": dict(request.headers),
            "client_ip": request.client.host if request.client else None,
            "user_agent": request.headers.get("user-agent", ""),
            "body": body,
            "content_type": request.headers.get("content-type", ""),
        }
    
    async def _log_request(self, request_data: Dict[str, Any], 
                          response_data: Dict[str, Any],
                          performance_data: Dict[str, Any],
                          exception_data: Dict[str, Any] = None):
        """記錄請求到 Loki"""
        try:
            if exception_data:
                # 記錄異常
                await self._log_exception(request_data, exception_data, performance_data)
            else:
                # 記錄正常請求
                async with loki_client:
                    await loki_client.push_api_log(
                        request_data, response_data, performance_data
                    )
        except Exception as e:
            logger.error(f"Failed to log request: {e}")
    
    async def _log_exception(self, request_data: Dict[str, Any], 
                           exception_data: Dict[str, Any],
                           performance_data: Dict[str, Any]):
        """記錄異常到 Loki"""
        labels = {
            "method": request_data.get("method", ""),
            "endpoint": request_data.get("path", ""),
            "status_code": "500",
            "exception_type": exception_data.get("type", "")
        }
        
        message = {
            "request": request_data,
            "exception": exception_data,
            "performance": performance_data
        }
        
        async with loki_client:
            await loki_client.push_log(
                message=json.dumps(message, ensure_ascii=False),
                labels=labels,
                level="error"
            )

5. FastAPI 應用程式

# main.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
import asyncio
import random
import time
from datetime import datetime
from decouple import config

# 導入監控中介軟體
from monitoring.middleware import LokiMonitoringMiddleware

# 建立 FastAPI 應用
app = FastAPI(
    title="FastAPI Monitoring Demo",
    description="API 監控示範應用",
    version="1.0.0"
)

# 添加監控中介軟體
app.add_middleware(
    LokiMonitoringMiddleware,
    service_name=config('SERVICE_NAME', default='fastapi-api-demo')
)

# 資料模型
class Product(BaseModel):
    id: Optional[int] = None
    name: str
    price: float
    description: str = ""
    created_at: Optional[datetime] = None

class Order(BaseModel):
    id: Optional[int] = None
    product_id: int
    quantity: int
    total_price: Optional[float] = None
    created_at: Optional[datetime] = None

class PurchaseRequest(BaseModel):
    quantity: int

# 模擬資料庫
products_db = [
    Product(id=1, name="MacBook Pro", price=45000, description="高效能筆電"),
    Product(id=2, name="iPhone 15", price=35000, description="最新智慧手機"),
    Product(id=3, name="AirPods Pro", price=8000, description="降噪耳機"),
    Product(id=4, name="iPad Air", price=20000, description="輕薄平板"),
]

orders_db = []
order_id_counter = 1

# API 端點
@app.get("/", tags=["Root"])
async def root():
    """根路徑"""
    return {"message": "FastAPI Monitoring Demo", "timestamp": datetime.utcnow()}

@app.get("/health", tags=["Health"])
async def health_check():
    """健康檢查"""
    return {"status": "healthy", "timestamp": datetime.utcnow()}

@app.get("/products", response_model=List[Product], tags=["Products"])
async def get_products():
    """取得所有商品"""
    # 模擬資料庫查詢延遲
    await asyncio.sleep(random.uniform(0.01, 0.05))
    return products_db

@app.get("/products/{product_id}", response_model=Product, tags=["Products"])
async def get_product(product_id: int):
    """取得特定商品"""
    await asyncio.sleep(random.uniform(0.01, 0.03))
    
    product = next((p for p in products_db if p.id == product_id), None)
    if not product:
        raise HTTPException(status_code=404, detail="商品不存在")
    return product

@app.get("/products/featured/list", response_model=List[Product], tags=["Products"])
async def get_featured_products():
    """取得精選商品(模擬較慢的查詢)"""
    # 模擬複雜查詢
    await asyncio.sleep(random.uniform(0.1, 0.5))
    
    # 回傳價格較高的商品
    featured = [p for p in products_db if p.price >= 20000]
    return featured

@app.post("/products/{product_id}/purchase", tags=["Orders"])
async def purchase_product(product_id: int, request: PurchaseRequest):
    """購買商品"""
    global order_id_counter
    
    # 查找商品
    product = next((p for p in products_db if p.id == product_id), None)
    if not product:
        raise HTTPException(status_code=404, detail="商品不存在")
    
    if request.quantity <= 0:
        raise HTTPException(status_code=400, detail="數量必須大於 0")
    
    # 模擬庫存檢查(10% 機率失敗)
    await asyncio.sleep(random.uniform(0.02, 0.1))
    if random.random() < 0.1:
        raise HTTPException(status_code=409, detail="庫存不足")
    
    # 建立訂單
    total_price = product.price * request.quantity
    order = Order(
        id=order_id_counter,
        product_id=product_id,
        quantity=request.quantity,
        total_price=total_price,
        created_at=datetime.utcnow()
    )
    
    orders_db.append(order)
    order_id_counter += 1
    
    return {
        "order_id": order.id,
        "product": product.name,
        "quantity": request.quantity,
        "total_price": total_price,
        "message": "購買成功"
    }

@app.get("/orders", response_model=List[Order], tags=["Orders"])
async def get_orders():
    """取得所有訂單"""
    await asyncio.sleep(random.uniform(0.01, 0.03))
    return orders_db

@app.get("/orders/stats", tags=["Orders"])
async def get_order_stats():
    """取得訂單統計"""
    await asyncio.sleep(random.uniform(0.05, 0.1))
    
    total_orders = len(orders_db)
    total_revenue = sum(order.total_price for order in orders_db)
    
    return {
        "total_orders": total_orders,
        "total_revenue": total_revenue,
        "average_order_value": total_revenue / total_orders if total_orders > 0 else 0
    }

@app.post("/simulate-error", tags=["Testing"])
async def simulate_error():
    """模擬伺服器錯誤"""
    await asyncio.sleep(random.uniform(0.1, 0.2))
    raise HTTPException(status_code=500, detail="模擬的伺服器錯誤")

@app.get("/slow-endpoint", tags=["Testing"])
async def slow_endpoint():
    """模擬慢速端點"""
    delay = random.uniform(1, 3)
    await asyncio.sleep(delay)
    return {"message": f"延遲 {delay:.2f} 秒後回應"}

# 例外處理器
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": exc.detail, "status_code": exc.status_code}
    )

@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
    return JSONResponse(
        status_code=500,
        content={"error": "內部伺服器錯誤", "status_code": 500}
    )

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=True,
        log_level="info"
    )

6. 環境配置

# .env
LOKI_URL=http://localhost:3100
LOKI_ENABLED=true
SERVICE_NAME=fastapi-api-demo
ENVIRONMENT=development
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
aiohttp==3.9.1
python-decouple==3.8
psutil==5.9.6

7. 非同步測試腳本

# async_test_client.py
import asyncio
import aiohttp
import time
import random
import json

class AsyncAPITester:
    def __init__(self, base_url: str = "http://localhost:8000"):
        self.base_url = base_url
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def test_endpoint(self, method: str, path: str, **kwargs):
        """測試單一端點"""
        url = f"{self.base_url}{path}"
        
        try:
            async with self.session.request(method, url, **kwargs) as response:
                content = await response.text()
                return {
                    "status": response.status,
                    "content": content,
                    "headers": dict(response.headers)
                }
        except Exception as e:
            return {"error": str(e)}
    
    async def load_test(self, concurrent_users: int = 10, duration: int = 30):
        """負載測試"""
        print(f"開始負載測試:{concurrent_users} 個並發使用者,持續 {duration} 秒")
        
        start_time = time.time()
        tasks = []
        
        # 建立並發任務
        for i in range(concurrent_users):
            task = asyncio.create_task(self.user_simulation(i, start_time, duration))
            tasks.append(task)
        
        # 等待所有任務完成
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 統計結果
        total_requests = sum(r['requests'] for r in results if isinstance(r, dict))
        total_errors = sum(r['errors'] for r in results if isinstance(r, dict))
        
        print(f"測試完成!")
        print(f"總請求數: {total_requests}")
        print(f"錯誤數: {total_errors}")
        print(f"成功率: {((total_requests - total_errors) / total_requests * 100):.2f}%")
    
    async def user_simulation(self, user_id: int, start_time: float, duration: int):
        """模擬使用者行為"""
        requests_count = 0
        errors_count = 0
        
        while time.time() - start_time < duration:
            # 隨機選擇操作
            action = random.choice([
                self.browse_products,
                self.view_product,
                self.purchase_product,
                self.check_orders
            ])
            
            try:
                await action()
                requests_count += 1
            except Exception:
                errors_count += 1
            
            # 隨機間隔
            await asyncio.sleep(random.uniform(0.1, 2.0))
        
        return {"user_id": user_id, "requests": requests_count, "errors": errors_count}
    
    async def browse_products(self):
        """瀏覽商品"""
        await self.test_endpoint("GET", "/products")
    
    async def view_product(self):
        """查看商品詳情"""
        product_id = random.randint(1, 4)
        await self.test_endpoint("GET", f"/products/{product_id}")
    
    async def purchase_product(self):
        """購買商品"""
        product_id = random.randint(1, 4)
        quantity = random.randint(1, 3)
        await self.test_endpoint(
            "POST", 
            f"/products/{product_id}/purchase",
            json={"quantity": quantity}
        )
    
    async def check_orders(self):
        """查看訂單"""
        await self.test_endpoint("GET", "/orders")

async def main():
    """主要測試函數"""
    async with AsyncAPITester() as tester:
        print("=== 基本功能測試 ===")
        
        # 基本端點測試
        endpoints = [
            ("GET", "/"),
            ("GET", "/health"),
            ("GET", "/products"),
            ("GET", "/products/1"),
            ("GET", "/products/featured/list"),
            ("POST", "/products/1/purchase", {"json": {"quantity": 1}}),
            ("GET", "/orders"),
            ("GET", "/orders/stats"),
            ("GET", "/slow-endpoint"),
        ]
        
        for method, path, *kwargs in endpoints:
            kwargs = kwargs[0] if kwargs else {}
            result = await tester.test_endpoint(method, path, **kwargs)
            print(f"{method} {path}: {result.get('status', 'Error')}")
            await asyncio.sleep(0.1)
        
        print("\n=== 錯誤測試 ===")
        # 測試錯誤情況
        error_tests = [
            ("GET", "/products/999"),  # 不存在的商品
            ("POST", "/products/1/purchase", {"json": {"quantity": -1}}),  # 無效數量
            ("POST", "/simulate-error"),  # 模擬錯誤
        ]
        
        for method, path, *kwargs in error_tests:
            kwargs = kwargs[0] if kwargs else {}
            result = await tester.test_endpoint(method, path, **kwargs)
            print(f"{method} {path}: {result.get('status', 'Error')}")
            await asyncio.sleep(0.1)
        
        print("\n=== 負載測試 ===")
        # 負載測試
        await tester.load_test(concurrent_users=5, duration=10)

if __name__ == "__main__":
    asyncio.run(main())

上一篇
Day17 - Django Middleware 實作 API 監控
下一篇
Day19 - Flask Middleware 實作 API 監控
系列文
Vibe Coding 後的挑戰:Locust x Loki 負載及監控20
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言