今天我們要學習如何在 Django 專案中實作 API 監控 middleware,將 API 請求的效能數據和日誌直接推送到 Loki。透過這個 middleware,我們可以自動收集每個 API 請求的詳細資訊,包括回應時間、狀態碼、錯誤訊息等。
Django middleware 是一個輕量級的框架,用於處理 Django 的請求和回應。它在請求到達 view 之前和回應返回給客戶端之後執行,是實作監控功能的完美位置。
請求 → Middleware 1 → Middleware 2 → View → Middleware 2 → Middleware 1 → 回應
每個 middleware 都有機會:
我們的監控 middleware 將收集以下資訊:
為了在 Loki 中有效查詢,我們需要設計合適的標籤:
service
: Django 應用名稱method
: HTTP 方法 (GET, POST, PUT, DELETE)endpoint
: API 端點路徑status_code
: HTTP 狀態碼level
: 日誌等級 (info, warning, error)# 建立 Django 專案
mkdir django-api-monitoring
cd django-api-monitoring
# 建立虛擬環境
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 安裝必要套件
pip install django djangorestframework requests python-decouple
# settings.py
import os
from decouple import config
# ... 其他設定 ...
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'monitoring', # 我們的監控 app
'api', # 示範 API app
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'monitoring.middleware.LokiMonitoringMiddleware', # 我們的監控 middleware
]
# Loki 設定
LOKI_URL = config('LOKI_URL', default='http://localhost:3100')
LOKI_ENABLED = config('LOKI_ENABLED', default=True, cast=bool)
SERVICE_NAME = config('SERVICE_NAME', default='django-api')
# REST Framework 設定
REST_FRAMEWORK = {
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
],
}
# monitoring/loki_client.py
import json
import time
import requests
from datetime import datetime
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
class LokiClient:
def __init__(self, url=None):
self.url = url or getattr(settings, 'LOKI_URL', 'http://localhost:3100')
self.enabled = getattr(settings, 'LOKI_ENABLED', True)
self.push_url = f"{self.url}/loki/api/v1/push"
self.session = requests.Session()
def push_log(self, message, labels=None, level="info"):
"""推送日誌到 Loki"""
if not self.enabled:
return True
if labels is None:
labels = {}
# 預設標籤
default_labels = {
"service": getattr(settings, 'SERVICE_NAME', 'django-api'),
"level": level,
"environment": getattr(settings, 'ENVIRONMENT', 'development')
}
default_labels.update(labels)
# 準備日誌資料
timestamp = str(int(time.time() * 1000000000)) # 納秒時間戳
log_entry = {
"streams": [
{
"stream": default_labels,
"values": [
[
timestamp,
json.dumps({
"timestamp": datetime.utcnow().isoformat() + "Z",
"message": message,
"level": level,
**labels
}, ensure_ascii=False)
]
]
}
]
}
try:
response = self.session.post(
self.push_url,
headers={"Content-Type": "application/json"},
data=json.dumps(log_entry),
timeout=2
)
return response.status_code == 204
except Exception as e:
logger.error(f"Failed to push log to Loki: {e}")
return False
def push_api_log(self, request_data, response_data, performance_data):
"""推送 API 請求日誌"""
labels = {
"method": request_data.get("method"),
"endpoint": request_data.get("path"),
"status_code": str(response_data.get("status_code")),
"user_id": str(request_data.get("user_id", "anonymous"))
}
# 根據狀態碼決定日誌等級
status_code = response_data.get("status_code", 500)
if status_code >= 500:
level = "error"
elif status_code >= 400:
level = "warning"
else:
level = "info"
message = {
"request": request_data,
"response": response_data,
"performance": performance_data
}
return self.push_log(
message=json.dumps(message, ensure_ascii=False),
labels=labels,
level=level
)
# 全域實例
loki_client = LokiClient()
# monitoring/middleware.py
import time
import json
from django.utils.deprecation import MiddlewareMixin
from django.db import connection
from .loki_client import loki_client
import logging
logger = logging.getLogger(__name__)
class LokiMonitoringMiddleware(MiddlewareMixin):
def __init__(self, get_response=None):
super().__init__(get_response)
self.get_response = get_response
def process_request(self, request):
"""請求開始時記錄開始時間和初始狀態"""
request._monitoring_start_time = time.time()
request._monitoring_start_queries = len(connection.queries)
return None
def process_response(self, request, response):
"""請求結束時計算指標並推送到 Loki"""
if not hasattr(request, '_monitoring_start_time'):
return response
# 計算處理時間
end_time = time.time()
duration = end_time - request._monitoring_start_time
# 計算資料庫查詢次數
end_queries = len(connection.queries)
query_count = end_queries - request._monitoring_start_queries
# 收集請求資訊
request_data = {
"method": request.method,
"path": request.path,
"query_params": dict(request.GET),
"content_type": request.content_type,
"remote_addr": self._get_client_ip(request),
"user_agent": request.META.get('HTTP_USER_AGENT', ''),
"user_id": getattr(request.user, 'id', None) if hasattr(request, 'user') and request.user.is_authenticated else None
}
# 收集回應資訊
response_data = {
"status_code": response.status_code,
"content_length": len(response.content) if hasattr(response, 'content') else 0,
"content_type": response.get('Content-Type', '')
}
# 收集效能指標
performance_data = {
"duration_ms": round(duration * 1000, 2),
"query_count": query_count,
"memory_usage": self._get_memory_usage()
}
# 非同步推送到 Loki(不阻塞回應)
try:
loki_client.push_api_log(request_data, response_data, performance_data)
except Exception as e:
logger.error(f"Failed to send monitoring data: {e}")
return response
def process_exception(self, request, exception):
"""處理異常時記錄錯誤資訊"""
if not hasattr(request, '_monitoring_start_time'):
return None
duration = time.time() - request._monitoring_start_time
error_data = {
"method": request.method,
"path": request.path,
"exception_type": type(exception).__name__,
"exception_message": str(exception),
"duration_ms": round(duration * 1000, 2),
"remote_addr": self._get_client_ip(request)
}
labels = {
"method": request.method,
"endpoint": request.path,
"status_code": "500",
"exception_type": type(exception).__name__
}
try:
loki_client.push_log(
message=json.dumps(error_data, ensure_ascii=False),
labels=labels,
level="error"
)
except Exception as e:
logger.error(f"Failed to send exception data: {e}")
return None
def _get_client_ip(self, request):
"""獲取客戶端真實 IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
def _get_memory_usage(self):
"""獲取記憶體使用量(可選)"""
try:
import psutil
import os
process = psutil.Process(os.getpid())
return round(process.memory_info().rss / 1024 / 1024, 2) # MB
except ImportError:
return None
# api/models.py
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=100)
price = models.DecimalField(max_digits=10, decimal_places=2)
description = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return self.name
class Order(models.Model):
product = models.ForeignKey(Product, on_delete=models.CASCADE)
quantity = models.IntegerField()
total_price = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
# api/serializers.py
from rest_framework import serializers
from .models import Product, Order
class ProductSerializer(serializers.ModelSerializer):
class Meta:
model = Product
fields = '__all__'
class OrderSerializer(serializers.ModelSerializer):
class Meta:
model = Order
fields = '__all__'
# api/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db.models import Count, Sum
from .models import Product, Order
from .serializers import ProductSerializer, OrderSerializer
import time
import random
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
@action(detail=False, methods=['get'])
def featured(self, request):
"""取得精選商品(模擬較慢的查詢)"""
# 模擬複雜查詢的延遲
time.sleep(random.uniform(0.1, 0.5))
featured_products = Product.objects.filter(
price__gte=1000
).annotate(
order_count=Count('order')
).order_by('-order_count')[:5]
serializer = self.get_serializer(featured_products, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""購買商品"""
product = self.get_object()
quantity = request.data.get('quantity', 1)
if quantity <= 0:
return Response(
{'error': '數量必須大於 0'},
status=status.HTTP_400_BAD_REQUEST
)
# 模擬庫存檢查
if random.random() < 0.1: # 10% 機率庫存不足
return Response(
{'error': '庫存不足'},
status=status.HTTP_409_CONFLICT
)
total_price = product.price * quantity
order = Order.objects.create(
product=product,
quantity=quantity,
total_price=total_price
)
return Response({
'order_id': order.id,
'product': product.name,
'quantity': quantity,
'total_price': float(total_price)
})
class OrderViewSet(viewsets.ReadOnlyModelViewSet):
queryset = Order.objects.all()
serializer_class = OrderSerializer
@action(detail=False, methods=['get'])
def stats(self, request):
"""取得訂單統計"""
stats = Order.objects.aggregate(
total_orders=Count('id'),
total_revenue=Sum('total_price')
)
return Response(stats)
# api/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import ProductViewSet, OrderViewSet
router = DefaultRouter()
router.register(r'products', ProductViewSet)
router.register(r'orders', OrderViewSet)
urlpatterns = [
path('', include(router.urls)),
]
# 主要 urls.py
from django.contrib import admin
from django.urls import path, include
urlpatterns = [
path('admin/', admin.site.urls),
path('api/', include('api.urls')),
]
# .env
DEBUG=True
SECRET_KEY=your-secret-key-here
LOKI_URL=http://localhost:3100
LOKI_ENABLED=True
SERVICE_NAME=django-api-demo
ENVIRONMENT=development
Django==4.2.7
djangorestframework==3.14.0
requests==2.31.0
python-decouple==3.8
psutil==5.9.6
明天我們將學習如何在 FastAPI 中實作類似的監控功能,並比較不同框架的實作差異和效能特點。