今天我們要學習如何在 FastAPI 專案中實作 API 監控 middleware。FastAPI 作為現代 Python Web 框架,提供了高效能的異步處理能力和靈活的 middleware 機制,非常適合實作高效率的監控解決方案。
FastAPI 的 middleware 基於 ASGI(Asynchronous Server Gateway Interface),支援異步處理,這讓我們能夠實作非阻塞的監控功能。
FastAPI 提供多種 middleware 實作方式:
# 同步處理(阻塞)
def sync_middleware():
start_time = time.time()
# 處理請求
response = process_request()
# 記錄日誌(阻塞 I/O)
log_to_loki(data) # 阻塞主執行緒
return response
# 異步處理(非阻塞)
async def async_middleware():
start_time = time.time()
# 處理請求
response = await process_request()
# 非阻塞日誌記錄
asyncio.create_task(log_to_loki(data)) # 不阻塞
return response
# 建立專案目錄
mkdir fastapi-api-monitoring
cd fastapi-api-monitoring
# 建立虛擬環境
python -m venv venv
source venv/bin/activate # Linux/Mac
# 安裝依賴
pip install fastapi uvicorn aiohttp pydantic python-decouple psutil
# monitoring/async_loki_client.py
import json
import time
import asyncio
import aiohttp
from datetime import datetime
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
class AsyncLokiClient:
def __init__(self, url: str = "http://localhost:3100", enabled: bool = True):
self.url = url
self.enabled = enabled
self.push_url = f"{url}/loki/api/v1/push"
self.session: Optional[aiohttp.ClientSession] = None
self.connector = aiohttp.TCPConnector(
limit=10, # 最大連線池大小
limit_per_host=5, # 每個主機最大連線數
keepalive_timeout=30,
enable_cleanup_closed=True
)
async def __aenter__(self):
"""異步上下文管理器進入"""
if self.session is None or self.session.closed:
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=2)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""異步上下文管理器退出"""
if self.session and not self.session.closed:
await self.session.close()
async def push_log(self, message: str, labels: Dict[str, str] = None, level: str = "info") -> bool:
"""異步推送日誌到 Loki"""
if not self.enabled:
return True
if labels is None:
labels = {}
# 預設標籤
default_labels = {
"service": "fastapi-api",
"level": level,
"environment": "development"
}
default_labels.update(labels)
# 準備日誌資料
timestamp = str(int(time.time() * 1000000000))
log_entry = {
"streams": [
{
"stream": default_labels,
"values": [
[
timestamp,
json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"message": message,
"level": level,
**labels
}, ensure_ascii=False)
]
]
}
]
}
try:
if self.session is None or self.session.closed:
await self.__aenter__()
async with self.session.post(
self.push_url,
headers={"Content-Type": "application/json"},
data=json.dumps(log_entry)
) as response:
return response.status == 204
except asyncio.TimeoutError:
logger.warning("Loki push timeout")
return False
except Exception as e:
logger.error(f"Failed to push log to Loki: {e}")
return False
async def push_api_log(self, request_data: Dict[str, Any],
response_data: Dict[str, Any],
performance_data: Dict[str, Any]) -> bool:
"""推送 API 請求日誌"""
labels = {
"method": request_data.get("method", ""),
"endpoint": request_data.get("path", ""),
"status_code": str(response_data.get("status_code", 0)),
"user_id": str(request_data.get("user_id", "anonymous"))
}
# 根據狀態碼決定日誌等級
status_code = response_data.get("status_code", 500)
if status_code >= 500:
level = "error"
elif status_code >= 400:
level = "warning"
else:
level = "info"
message = {
"request": request_data,
"response": response_data,
"performance": performance_data
}
return await self.push_log(
message=json.dumps(message, ensure_ascii=False),
labels=labels,
level=level
)
# 全域異步客戶端
loki_client = AsyncLokiClient()
# monitoring/metrics.py
import time
import psutil
import asyncio
from typing import Dict, Any
import sys
import gc
class PerformanceCollector:
def __init__(self):
self.process = psutil.Process()
self.start_time = time.time()
def get_system_metrics(self) -> Dict[str, Any]:
"""獲取系統效能指標"""
try:
return {
"cpu_percent": self.process.cpu_percent(),
"memory_rss_mb": round(self.process.memory_info().rss / 1024 / 1024, 2),
"memory_percent": self.process.memory_percent(),
"open_files": len(self.process.open_files()),
"threads": self.process.num_threads(),
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
return {}
def get_python_metrics(self) -> Dict[str, Any]:
"""獲取 Python 運行時指標"""
return {
"gc_objects": len(gc.get_objects()),
"gc_collections": sum(gc.get_stats()[i]['collections'] for i in range(3)),
"uptime_seconds": round(time.time() - self.start_time, 2)
}
async def get_async_metrics(self) -> Dict[str, Any]:
"""獲取異步相關指標"""
try:
loop = asyncio.get_running_loop()
tasks = asyncio.all_tasks(loop)
return {
"active_tasks": len(tasks),
"pending_tasks": len([t for t in tasks if not t.done()]),
"loop_time": getattr(loop, '_time', 0)
}
except RuntimeError:
return {}
async def get_all_metrics(self) -> Dict[str, Any]:
"""獲取所有效能指標"""
metrics = {}
metrics.update(self.get_system_metrics())
metrics.update(self.get_python_metrics())
metrics.update(await self.get_async_metrics())
return metrics
# 全域效能收集器
performance_collector = PerformanceCollector()
# monitoring/middleware.py
import time
import json
import asyncio
from typing import Callable, Any, Dict
from fastapi import Request, Response
from fastapi.responses import JSONResponse
import logging
from .async_loki_client import loki_client
from .metrics import performance_collector
logger = logging.getLogger(__name__)
class LokiMonitoringMiddleware:
def __init__(self, app, service_name: str = "fastapi-api"):
self.app = app
self.service_name = service_name
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
# 建立 FastAPI Request 物件
request = Request(scope, receive)
# 記錄開始時間和初始指標
start_time = time.time()
start_metrics = await performance_collector.get_all_metrics()
# 收集請求資訊
request_data = await self._collect_request_data(request)
response_data = {}
exception_data = None
async def send_wrapper(message):
nonlocal response_data
if message["type"] == "http.response.start":
response_data = {
"status_code": message["status"],
"headers": dict(message.get("headers", []))
}
elif message["type"] == "http.response.body":
body = message.get("body", b"")
response_data["content_length"] = len(body)
await send(message)
try:
# 處理請求
await self.app(scope, receive, send_wrapper)
except Exception as e:
# 記錄異常
exception_data = {
"type": type(e).__name__,
"message": str(e),
"traceback": None # 可以加入 traceback 資訊
}
response_data["status_code"] = 500
# 發送錯誤回應
error_response = JSONResponse(
status_code=500,
content={"error": "Internal Server Error"}
)
await send_wrapper({
"type": "http.response.start",
"status": 500,
"headers": error_response.headers.raw
})
await send_wrapper({
"type": "http.response.body",
"body": error_response.body
})
finally:
# 計算效能指標
end_time = time.time()
duration = end_time - start_time
end_metrics = await performance_collector.get_all_metrics()
performance_data = {
"duration_ms": round(duration * 1000, 2),
"start_metrics": start_metrics,
"end_metrics": end_metrics
}
# 非同步記錄到 Loki
asyncio.create_task(
self._log_request(
request_data,
response_data,
performance_data,
exception_data
)
)
async def _collect_request_data(self, request: Request) -> Dict[str, Any]:
"""收集請求資料"""
# 嘗試獲取請求體(需要小心處理)
body = None
try:
if request.headers.get("content-type", "").startswith("application/json"):
# 只記錄 JSON 請求體的前 1KB
body_bytes = await request.body()
if len(body_bytes) <= 1024:
body = body_bytes.decode("utf-8")
except Exception:
body = None
return {
"method": request.method,
"path": request.url.path,
"query_params": dict(request.query_params),
"headers": dict(request.headers),
"client_ip": request.client.host if request.client else None,
"user_agent": request.headers.get("user-agent", ""),
"body": body,
"content_type": request.headers.get("content-type", ""),
}
async def _log_request(self, request_data: Dict[str, Any],
response_data: Dict[str, Any],
performance_data: Dict[str, Any],
exception_data: Dict[str, Any] = None):
"""記錄請求到 Loki"""
try:
if exception_data:
# 記錄異常
await self._log_exception(request_data, exception_data, performance_data)
else:
# 記錄正常請求
async with loki_client:
await loki_client.push_api_log(
request_data, response_data, performance_data
)
except Exception as e:
logger.error(f"Failed to log request: {e}")
async def _log_exception(self, request_data: Dict[str, Any],
exception_data: Dict[str, Any],
performance_data: Dict[str, Any]):
"""記錄異常到 Loki"""
labels = {
"method": request_data.get("method", ""),
"endpoint": request_data.get("path", ""),
"status_code": "500",
"exception_type": exception_data.get("type", "")
}
message = {
"request": request_data,
"exception": exception_data,
"performance": performance_data
}
async with loki_client:
await loki_client.push_log(
message=json.dumps(message, ensure_ascii=False),
labels=labels,
level="error"
)
# main.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
import asyncio
import random
import time
from datetime import datetime
from decouple import config
# 導入監控中介軟體
from monitoring.middleware import LokiMonitoringMiddleware
# 建立 FastAPI 應用
app = FastAPI(
title="FastAPI Monitoring Demo",
description="API 監控示範應用",
version="1.0.0"
)
# 添加監控中介軟體
app.add_middleware(
LokiMonitoringMiddleware,
service_name=config('SERVICE_NAME', default='fastapi-api-demo')
)
# 資料模型
class Product(BaseModel):
id: Optional[int] = None
name: str
price: float
description: str = ""
created_at: Optional[datetime] = None
class Order(BaseModel):
id: Optional[int] = None
product_id: int
quantity: int
total_price: Optional[float] = None
created_at: Optional[datetime] = None
class PurchaseRequest(BaseModel):
quantity: int
# 模擬資料庫
products_db = [
Product(id=1, name="MacBook Pro", price=45000, description="高效能筆電"),
Product(id=2, name="iPhone 15", price=35000, description="最新智慧手機"),
Product(id=3, name="AirPods Pro", price=8000, description="降噪耳機"),
Product(id=4, name="iPad Air", price=20000, description="輕薄平板"),
]
orders_db = []
order_id_counter = 1
# API 端點
@app.get("/", tags=["Root"])
async def root():
"""根路徑"""
return {"message": "FastAPI Monitoring Demo", "timestamp": datetime.utcnow()}
@app.get("/health", tags=["Health"])
async def health_check():
"""健康檢查"""
return {"status": "healthy", "timestamp": datetime.utcnow()}
@app.get("/products", response_model=List[Product], tags=["Products"])
async def get_products():
"""取得所有商品"""
# 模擬資料庫查詢延遲
await asyncio.sleep(random.uniform(0.01, 0.05))
return products_db
@app.get("/products/{product_id}", response_model=Product, tags=["Products"])
async def get_product(product_id: int):
"""取得特定商品"""
await asyncio.sleep(random.uniform(0.01, 0.03))
product = next((p for p in products_db if p.id == product_id), None)
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
return product
@app.get("/products/featured/list", response_model=List[Product], tags=["Products"])
async def get_featured_products():
"""取得精選商品(模擬較慢的查詢)"""
# 模擬複雜查詢
await asyncio.sleep(random.uniform(0.1, 0.5))
# 回傳價格較高的商品
featured = [p for p in products_db if p.price >= 20000]
return featured
@app.post("/products/{product_id}/purchase", tags=["Orders"])
async def purchase_product(product_id: int, request: PurchaseRequest):
"""購買商品"""
global order_id_counter
# 查找商品
product = next((p for p in products_db if p.id == product_id), None)
if not product:
raise HTTPException(status_code=404, detail="商品不存在")
if request.quantity <= 0:
raise HTTPException(status_code=400, detail="數量必須大於 0")
# 模擬庫存檢查(10% 機率失敗)
await asyncio.sleep(random.uniform(0.02, 0.1))
if random.random() < 0.1:
raise HTTPException(status_code=409, detail="庫存不足")
# 建立訂單
total_price = product.price * request.quantity
order = Order(
id=order_id_counter,
product_id=product_id,
quantity=request.quantity,
total_price=total_price,
created_at=datetime.utcnow()
)
orders_db.append(order)
order_id_counter += 1
return {
"order_id": order.id,
"product": product.name,
"quantity": request.quantity,
"total_price": total_price,
"message": "購買成功"
}
@app.get("/orders", response_model=List[Order], tags=["Orders"])
async def get_orders():
"""取得所有訂單"""
await asyncio.sleep(random.uniform(0.01, 0.03))
return orders_db
@app.get("/orders/stats", tags=["Orders"])
async def get_order_stats():
"""取得訂單統計"""
await asyncio.sleep(random.uniform(0.05, 0.1))
total_orders = len(orders_db)
total_revenue = sum(order.total_price for order in orders_db)
return {
"total_orders": total_orders,
"total_revenue": total_revenue,
"average_order_value": total_revenue / total_orders if total_orders > 0 else 0
}
@app.post("/simulate-error", tags=["Testing"])
async def simulate_error():
"""模擬伺服器錯誤"""
await asyncio.sleep(random.uniform(0.1, 0.2))
raise HTTPException(status_code=500, detail="模擬的伺服器錯誤")
@app.get("/slow-endpoint", tags=["Testing"])
async def slow_endpoint():
"""模擬慢速端點"""
delay = random.uniform(1, 3)
await asyncio.sleep(delay)
return {"message": f"延遲 {delay:.2f} 秒後回應"}
# 例外處理器
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": exc.detail, "status_code": exc.status_code}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content={"error": "內部伺服器錯誤", "status_code": 500}
)
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
# .env
LOKI_URL=http://localhost:3100
LOKI_ENABLED=true
SERVICE_NAME=fastapi-api-demo
ENVIRONMENT=development
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
aiohttp==3.9.1
python-decouple==3.8
psutil==5.9.6
# async_test_client.py
import asyncio
import aiohttp
import time
import random
import json
class AsyncAPITester:
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def test_endpoint(self, method: str, path: str, **kwargs):
"""測試單一端點"""
url = f"{self.base_url}{path}"
try:
async with self.session.request(method, url, **kwargs) as response:
content = await response.text()
return {
"status": response.status,
"content": content,
"headers": dict(response.headers)
}
except Exception as e:
return {"error": str(e)}
async def load_test(self, concurrent_users: int = 10, duration: int = 30):
"""負載測試"""
print(f"開始負載測試:{concurrent_users} 個並發使用者,持續 {duration} 秒")
start_time = time.time()
tasks = []
# 建立並發任務
for i in range(concurrent_users):
task = asyncio.create_task(self.user_simulation(i, start_time, duration))
tasks.append(task)
# 等待所有任務完成
results = await asyncio.gather(*tasks, return_exceptions=True)
# 統計結果
total_requests = sum(r['requests'] for r in results if isinstance(r, dict))
total_errors = sum(r['errors'] for r in results if isinstance(r, dict))
print(f"測試完成!")
print(f"總請求數: {total_requests}")
print(f"錯誤數: {total_errors}")
print(f"成功率: {((total_requests - total_errors) / total_requests * 100):.2f}%")
async def user_simulation(self, user_id: int, start_time: float, duration: int):
"""模擬使用者行為"""
requests_count = 0
errors_count = 0
while time.time() - start_time < duration:
# 隨機選擇操作
action = random.choice([
self.browse_products,
self.view_product,
self.purchase_product,
self.check_orders
])
try:
await action()
requests_count += 1
except Exception:
errors_count += 1
# 隨機間隔
await asyncio.sleep(random.uniform(0.1, 2.0))
return {"user_id": user_id, "requests": requests_count, "errors": errors_count}
async def browse_products(self):
"""瀏覽商品"""
await self.test_endpoint("GET", "/products")
async def view_product(self):
"""查看商品詳情"""
product_id = random.randint(1, 4)
await self.test_endpoint("GET", f"/products/{product_id}")
async def purchase_product(self):
"""購買商品"""
product_id = random.randint(1, 4)
quantity = random.randint(1, 3)
await self.test_endpoint(
"POST",
f"/products/{product_id}/purchase",
json={"quantity": quantity}
)
async def check_orders(self):
"""查看訂單"""
await self.test_endpoint("GET", "/orders")
async def main():
"""主要測試函數"""
async with AsyncAPITester() as tester:
print("=== 基本功能測試 ===")
# 基本端點測試
endpoints = [
("GET", "/"),
("GET", "/health"),
("GET", "/products"),
("GET", "/products/1"),
("GET", "/products/featured/list"),
("POST", "/products/1/purchase", {"json": {"quantity": 1}}),
("GET", "/orders"),
("GET", "/orders/stats"),
("GET", "/slow-endpoint"),
]
for method, path, *kwargs in endpoints:
kwargs = kwargs[0] if kwargs else {}
result = await tester.test_endpoint(method, path, **kwargs)
print(f"{method} {path}: {result.get('status', 'Error')}")
await asyncio.sleep(0.1)
print("\n=== 錯誤測試 ===")
# 測試錯誤情況
error_tests = [
("GET", "/products/999"), # 不存在的商品
("POST", "/products/1/purchase", {"json": {"quantity": -1}}), # 無效數量
("POST", "/simulate-error"), # 模擬錯誤
]
for method, path, *kwargs in error_tests:
kwargs = kwargs[0] if kwargs else {}
result = await tester.test_endpoint(method, path, **kwargs)
print(f"{method} {path}: {result.get('status', 'Error')}")
await asyncio.sleep(0.1)
print("\n=== 負載測試 ===")
# 負載測試
await tester.load_test(concurrent_users=5, duration=10)
if __name__ == "__main__":
asyncio.run(main())