iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0

今天我們要學習如何在 Django 專案中實作 API 監控 middleware,將 API 請求的效能數據和日誌直接推送到 Loki。透過這個 middleware,我們可以自動收集每個 API 請求的詳細資訊,包括回應時間、狀態碼、錯誤訊息等。

Django Middleware 基礎

Django middleware 是一個輕量級的框架,用於處理 Django 的請求和回應。它在請求到達 view 之前和回應返回給客戶端之後執行,是實作監控功能的完美位置。

Middleware 執行順序

請求 → Middleware 1 → Middleware 2 → View → Middleware 2 → Middleware 1 → 回應

每個 middleware 都有機會:

  • 在 view 執行前修改請求
  • 在 view 執行後修改回應
  • 記錄和監控請求處理過程

監控 Middleware 設計

核心功能

我們的監控 middleware 將收集以下資訊:

  • 請求資訊:HTTP 方法、URL 路徑、IP 位址
  • 效能指標:請求處理時間、資料庫查詢次數
  • 回應資訊:狀態碼、回應大小
  • 錯誤追蹤:異常類型和錯誤訊息
  • 使用者資訊:認證使用者 ID(如果適用)

標籤策略

為了在 Loki 中有效查詢,我們需要設計合適的標籤:

  • service: Django 應用名稱
  • method: HTTP 方法 (GET, POST, PUT, DELETE)
  • endpoint: API 端點路徑
  • status_code: HTTP 狀態碼
  • level: 日誌等級 (info, warning, error)

實作步驟

1. 建立專案結構

# 建立 Django 專案
mkdir django-api-monitoring
cd django-api-monitoring

# 建立虛擬環境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate     # Windows

# 安裝必要套件
pip install django djangorestframework requests python-decouple

2. Django 設定檔

# settings.py
import os
from decouple import config

# ... 其他設定 ...

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'monitoring',  # 我們的監控 app
    'api',  # 示範 API app
]

MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'monitoring.middleware.LokiMonitoringMiddleware',  # 我們的監控 middleware
]

# Loki 設定
LOKI_URL = config('LOKI_URL', default='http://localhost:3100')
LOKI_ENABLED = config('LOKI_ENABLED', default=True, cast=bool)
SERVICE_NAME = config('SERVICE_NAME', default='django-api')

# REST Framework 設定
REST_FRAMEWORK = {
    'DEFAULT_RENDERER_CLASSES': [
        'rest_framework.renderers.JSONRenderer',
    ],
}

3. Loki 客戶端實作

# monitoring/loki_client.py
import json
import time
import requests
from datetime import datetime
from django.conf import settings
import logging

logger = logging.getLogger(__name__)

class LokiClient:
    def __init__(self, url=None):
        self.url = url or getattr(settings, 'LOKI_URL', 'http://localhost:3100')
        self.enabled = getattr(settings, 'LOKI_ENABLED', True)
        self.push_url = f"{self.url}/loki/api/v1/push"
        self.session = requests.Session()
    
    def push_log(self, message, labels=None, level="info"):
        """推送日誌到 Loki"""
        if not self.enabled:
            return True
            
        if labels is None:
            labels = {}
        
        # 預設標籤
        default_labels = {
            "service": getattr(settings, 'SERVICE_NAME', 'django-api'),
            "level": level,
            "environment": getattr(settings, 'ENVIRONMENT', 'development')
        }
        default_labels.update(labels)
        
        # 準備日誌資料
        timestamp = str(int(time.time() * 1000000000))  # 納秒時間戳
        
        log_entry = {
            "streams": [
                {
                    "stream": default_labels,
                    "values": [
                        [
                            timestamp,
                            json.dumps({
                                "timestamp": datetime.utcnow().isoformat() + "Z",
                                "message": message,
                                "level": level,
                                **labels
                            }, ensure_ascii=False)
                        ]
                    ]
                }
            ]
        }
        
        try:
            response = self.session.post(
                self.push_url,
                headers={"Content-Type": "application/json"},
                data=json.dumps(log_entry),
                timeout=2
            )
            return response.status_code == 204
        except Exception as e:
            logger.error(f"Failed to push log to Loki: {e}")
            return False
    
    def push_api_log(self, request_data, response_data, performance_data):
        """推送 API 請求日誌"""
        labels = {
            "method": request_data.get("method"),
            "endpoint": request_data.get("path"),
            "status_code": str(response_data.get("status_code")),
            "user_id": str(request_data.get("user_id", "anonymous"))
        }
        
        # 根據狀態碼決定日誌等級
        status_code = response_data.get("status_code", 500)
        if status_code >= 500:
            level = "error"
        elif status_code >= 400:
            level = "warning"
        else:
            level = "info"
        
        message = {
            "request": request_data,
            "response": response_data,
            "performance": performance_data
        }
        
        return self.push_log(
            message=json.dumps(message, ensure_ascii=False),
            labels=labels,
            level=level
        )

# 全域實例
loki_client = LokiClient()

4. 監控 Middleware 實作

# monitoring/middleware.py
import time
import json
from django.utils.deprecation import MiddlewareMixin
from django.db import connection
from .loki_client import loki_client
import logging

logger = logging.getLogger(__name__)

class LokiMonitoringMiddleware(MiddlewareMixin):
    def __init__(self, get_response=None):
        super().__init__(get_response)
        self.get_response = get_response
    
    def process_request(self, request):
        """請求開始時記錄開始時間和初始狀態"""
        request._monitoring_start_time = time.time()
        request._monitoring_start_queries = len(connection.queries)
        return None
    
    def process_response(self, request, response):
        """請求結束時計算指標並推送到 Loki"""
        if not hasattr(request, '_monitoring_start_time'):
            return response
        
        # 計算處理時間
        end_time = time.time()
        duration = end_time - request._monitoring_start_time
        
        # 計算資料庫查詢次數
        end_queries = len(connection.queries)
        query_count = end_queries - request._monitoring_start_queries
        
        # 收集請求資訊
        request_data = {
            "method": request.method,
            "path": request.path,
            "query_params": dict(request.GET),
            "content_type": request.content_type,
            "remote_addr": self._get_client_ip(request),
            "user_agent": request.META.get('HTTP_USER_AGENT', ''),
            "user_id": getattr(request.user, 'id', None) if hasattr(request, 'user') and request.user.is_authenticated else None
        }
        
        # 收集回應資訊
        response_data = {
            "status_code": response.status_code,
            "content_length": len(response.content) if hasattr(response, 'content') else 0,
            "content_type": response.get('Content-Type', '')
        }
        
        # 收集效能指標
        performance_data = {
            "duration_ms": round(duration * 1000, 2),
            "query_count": query_count,
            "memory_usage": self._get_memory_usage()
        }
        
        # 非同步推送到 Loki(不阻塞回應)
        try:
            loki_client.push_api_log(request_data, response_data, performance_data)
        except Exception as e:
            logger.error(f"Failed to send monitoring data: {e}")
        
        return response
    
    def process_exception(self, request, exception):
        """處理異常時記錄錯誤資訊"""
        if not hasattr(request, '_monitoring_start_time'):
            return None
        
        duration = time.time() - request._monitoring_start_time
        
        error_data = {
            "method": request.method,
            "path": request.path,
            "exception_type": type(exception).__name__,
            "exception_message": str(exception),
            "duration_ms": round(duration * 1000, 2),
            "remote_addr": self._get_client_ip(request)
        }
        
        labels = {
            "method": request.method,
            "endpoint": request.path,
            "status_code": "500",
            "exception_type": type(exception).__name__
        }
        
        try:
            loki_client.push_log(
                message=json.dumps(error_data, ensure_ascii=False),
                labels=labels,
                level="error"
            )
        except Exception as e:
            logger.error(f"Failed to send exception data: {e}")
        
        return None
    
    def _get_client_ip(self, request):
        """獲取客戶端真實 IP"""
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR')
    
    def _get_memory_usage(self):
        """獲取記憶體使用量(可選)"""
        try:
            import psutil
            import os
            process = psutil.Process(os.getpid())
            return round(process.memory_info().rss / 1024 / 1024, 2)  # MB
        except ImportError:
            return None

5. 示範 API 應用

# api/models.py
from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    description = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    
    def __str__(self):
        return self.name

class Order(models.Model):
    product = models.ForeignKey(Product, on_delete=models.CASCADE)
    quantity = models.IntegerField()
    total_price = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)
# api/serializers.py
from rest_framework import serializers
from .models import Product, Order

class ProductSerializer(serializers.ModelSerializer):
    class Meta:
        model = Product
        fields = '__all__'

class OrderSerializer(serializers.ModelSerializer):
    class Meta:
        model = Order
        fields = '__all__'
# api/views.py
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db.models import Count, Sum
from .models import Product, Order
from .serializers import ProductSerializer, OrderSerializer
import time
import random

class ProductViewSet(viewsets.ModelViewSet):
    queryset = Product.objects.all()
    serializer_class = ProductSerializer
    
    @action(detail=False, methods=['get'])
    def featured(self, request):
        """取得精選商品(模擬較慢的查詢)"""
        # 模擬複雜查詢的延遲
        time.sleep(random.uniform(0.1, 0.5))
        
        featured_products = Product.objects.filter(
            price__gte=1000
        ).annotate(
            order_count=Count('order')
        ).order_by('-order_count')[:5]
        
        serializer = self.get_serializer(featured_products, many=True)
        return Response(serializer.data)
    
    @action(detail=True, methods=['post'])
    def purchase(self, request, pk=None):
        """購買商品"""
        product = self.get_object()
        quantity = request.data.get('quantity', 1)
        
        if quantity <= 0:
            return Response(
                {'error': '數量必須大於 0'}, 
                status=status.HTTP_400_BAD_REQUEST
            )
        
        # 模擬庫存檢查
        if random.random() < 0.1:  # 10% 機率庫存不足
            return Response(
                {'error': '庫存不足'}, 
                status=status.HTTP_409_CONFLICT
            )
        
        total_price = product.price * quantity
        order = Order.objects.create(
            product=product,
            quantity=quantity,
            total_price=total_price
        )
        
        return Response({
            'order_id': order.id,
            'product': product.name,
            'quantity': quantity,
            'total_price': float(total_price)
        })

class OrderViewSet(viewsets.ReadOnlyModelViewSet):
    queryset = Order.objects.all()
    serializer_class = OrderSerializer
    
    @action(detail=False, methods=['get'])
    def stats(self, request):
        """取得訂單統計"""
        stats = Order.objects.aggregate(
            total_orders=Count('id'),
            total_revenue=Sum('total_price')
        )
        return Response(stats)

6. URL 配置

# api/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from .views import ProductViewSet, OrderViewSet

router = DefaultRouter()
router.register(r'products', ProductViewSet)
router.register(r'orders', OrderViewSet)

urlpatterns = [
    path('', include(router.urls)),
]
# 主要 urls.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('api.urls')),
]

7. 環境設定檔

# .env
DEBUG=True
SECRET_KEY=your-secret-key-here
LOKI_URL=http://localhost:3100
LOKI_ENABLED=True
SERVICE_NAME=django-api-demo
ENVIRONMENT=development

8. requirements.txt

Django==4.2.7
djangorestframework==3.14.0
requests==2.31.0
python-decouple==3.8
psutil==5.9.6

明天我們將學習如何在 FastAPI 中實作類似的監控功能,並比較不同框架的實作差異和效能特點。


上一篇
Day16 - Grafana & Loki 介紹以及部署
下一篇
Day18 - FastAPI Middleware 實作 API 監控
系列文
Vibe Coding 後的挑戰:Locust x Loki 負載及監控19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言