今天我們要建立完整的監控和日誌系統!學習如何即時追蹤 AI 助理的效能、診斷問題、並持續優化系統運行品質。
生產環境的系統監控至關重要:
監控系統架構
├── 應用層監控
│ ├── API 請求追蹤
│ ├── 回應時間
│ ├── 錯誤率
│ └── 業務指標
├── 系統層監控
│ ├── CPU 使用率
│ ├── 記憶體使用
│ ├── 磁碟 I/O
│ └── 網路流量
└── 日誌管理
├── 結構化日誌
├── 日誌聚合
├── 日誌分析
└── 告警機制
from prometheus_client import Counter, Histogram, Gauge, generate_latest, REGISTRY
from fastapi import Request, Response
from typing import Callable
import time
import psutil
import logging
from datetime import datetime
import json
logger = logging.getLogger(__name__)
# ═══════════════════════════════════════════════════════════
# Prometheus 指標定義
# ═══════════════════════════════════════════════════════════
# HTTP 請求計數器
http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# 請求持續時間直方圖
http_request_duration_seconds = Histogram(
'http_request_duration_seconds',
'HTTP request latency in seconds',
['method', 'endpoint'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
# AI 模型呼叫計數器
ai_model_calls_total = Counter(
'ai_model_calls_total',
'Total AI model API calls',
['model', 'status']
)
# AI 模型回應時間
ai_model_duration_seconds = Histogram(
'ai_model_duration_seconds',
'AI model response time in seconds',
['model'],
buckets=[0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
# Token 使用量
ai_tokens_used = Counter(
'ai_tokens_used_total',
'Total tokens consumed',
['model', 'type'] # type: prompt, completion
)
# 系統資源使用量
system_cpu_usage = Gauge('system_cpu_usage_percent', 'CPU usage percentage')
system_memory_usage = Gauge('system_memory_usage_percent', 'Memory usage percentage')
system_disk_usage = Gauge('system_disk_usage_percent', 'Disk usage percentage')
# 活躍會話數
active_sessions = Gauge('active_sessions', 'Number of active user sessions')
# 錯誤計數器
errors_total = Counter(
'errors_total',
'Total errors',
['error_type', 'endpoint']
)
# ═══════════════════════════════════════════════════════════
# 監控中介軟體
# ═══════════════════════════════════════════════════════════
class MonitoringMiddleware:
"""監控中介軟體"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
start_time = time.time()
# 包裝 send 以捕獲狀態碼
status_code = 200
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
await send(message)
try:
await self.app(scope, receive, send_wrapper)
except Exception as e:
status_code = 500
logger.error(f"Request failed: {e}")
errors_total.labels(
error_type=type(e).__name__,
endpoint=request.url.path
).inc()
raise
finally:
# 記錄請求指標
duration = time.time() - start_time
http_requests_total.labels(
method=request.method,
endpoint=request.url.path,
status=status_code
).inc()
http_request_duration_seconds.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
# ═══════════════════════════════════════════════════════════
# 系統資源監控
# ═══════════════════════════════════════════════════════════
class SystemMonitor:
"""系統資源監控器"""
@staticmethod
def update_system_metrics():
"""更新系統指標"""
try:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
system_cpu_usage.set(cpu_percent)
# 記憶體使用率
memory = psutil.virtual_memory()
system_memory_usage.set(memory.percent)
# 磁碟使用率
disk = psutil.disk_usage('/')
system_disk_usage.set(disk.percent)
except Exception as e:
logger.error(f"Failed to update system metrics: {e}")
@staticmethod
def get_system_info():
"""獲取系統資訊"""
try:
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent
},
'disk': {
'total': psutil.disk_usage('/').total,
'used': psutil.disk_usage('/').used,
'percent': psutil.disk_usage('/').percent
},
'network': psutil.net_io_counters()._asdict()
}
except Exception as e:
logger.error(f"Failed to get system info: {e}")
return {}
# ═══════════════════════════════════════════════════════════
# AI 模型監控
# ═══════════════════════════════════════════════════════════
class AIModelMonitor:
"""AI 模型監控器"""
@staticmethod
def track_model_call(model_name: str, duration: float,
status: str, tokens: dict = None):
"""追蹤模型呼叫"""
# 記錄呼叫
ai_model_calls_total.labels(
model=model_name,
status=status
).inc()
# 記錄回應時間
ai_model_duration_seconds.labels(
model=model_name
).observe(duration)
# 記錄 token 使用
if tokens:
if 'prompt' in tokens:
ai_tokens_used.labels(
model=model_name,
type='prompt'
).inc(tokens['prompt'])
if 'completion' in tokens:
ai_tokens_used.labels(
model=model_name,
type='completion'
).inc(tokens['completion'])
@staticmethod
def get_model_stats():
"""獲取模型統計"""
return {
'total_calls': ai_model_calls_total._metrics,
'avg_duration': ai_model_duration_seconds._metrics,
'total_tokens': ai_tokens_used._metrics
}
import logging
import json
from datetime import datetime
from typing import Any, Dict
import sys
from pathlib import Path
class StructuredLogger:
"""結構化日誌記錄器"""
def __init__(self, name: str, log_file: str = None):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# 避免重複添加 handler
if not self.logger.handlers:
# 控制台 handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(
StructuredFormatter()
)
self.logger.addHandler(console_handler)
# 檔案 handler(如果指定)
if log_file:
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
StructuredFormatter()
)
self.logger.addHandler(file_handler)
def info(self, message: str, **kwargs):
"""記錄 INFO 等級日誌"""
self._log('INFO', message, kwargs)
def warning(self, message: str, **kwargs):
"""記錄 WARNING 等級日誌"""
self._log('WARNING', message, kwargs)
def error(self, message: str, **kwargs):
"""記錄 ERROR 等級日誌"""
self._log('ERROR', message, kwargs)
def _log(self, level: str, message: str, context: Dict[str, Any]):
"""內部日誌方法"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'message': message,
**context
}
getattr(self.logger, level.lower())(
json.dumps(log_entry, ensure_ascii=False)
)
class StructuredFormatter(logging.Formatter):
"""結構化日誌格式化器"""
def format(self, record: logging.LogRecord) -> str:
"""格式化日誌記錄"""
try:
# 嘗試解析為 JSON
log_data = json.loads(record.getMessage())
return json.dumps(log_data, ensure_ascii=False)
except json.JSONDecodeError:
# 如果不是 JSON,建立結構化格式
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}
return json.dumps(log_data, ensure_ascii=False)
# ═══════════════════════════════════════════════════════════
# 日誌分析器
# ═══════════════════════════════════════════════════════════
class LogAnalyzer:
"""日誌分析器"""
def __init__(self, log_file: str):
self.log_file = log_file
def analyze_errors(self, time_range: int = 3600):
"""分析錯誤日誌"""
errors = []
cutoff_time = datetime.utcnow().timestamp() - time_range
try:
with open(self.log_file, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)
if log_entry.get('level') == 'ERROR':
timestamp = datetime.fromisoformat(
log_entry['timestamp']
).timestamp()
if timestamp > cutoff_time:
errors.append(log_entry)
except:
continue
except FileNotFoundError:
pass
return {
'total_errors': len(errors),
'error_types': self._count_error_types(errors),
'recent_errors': errors[-10:] # 最近 10 個錯誤
}
def _count_error_types(self, errors: list) -> dict:
"""統計錯誤類型"""
error_types = {}
for error in errors:
error_type = error.get('error_type', 'Unknown')
error_types[error_type] = error_types.get(error_type, 0) + 1
return error_types
def get_request_stats(self, time_range: int = 3600):
"""獲取請求統計"""
requests = []
cutoff_time = datetime.utcnow().timestamp() - time_range
try:
with open(self.log_file, 'r') as f:
for line in f:
try:
log_entry = json.loads(line)
if 'request' in log_entry.get('message', '').lower():
timestamp = datetime.fromisoformat(
log_entry['timestamp']
).timestamp()
if timestamp > cutoff_time:
requests.append(log_entry)
except:
continue
except FileNotFoundError:
pass
return {
'total_requests': len(requests),
'avg_duration': self._calculate_avg_duration(requests),
'endpoints': self._count_endpoints(requests)
}
def _calculate_avg_duration(self, requests: list) -> float:
"""計算平均請求時間"""
durations = [
r.get('duration', 0) for r in requests
if 'duration' in r
]
return sum(durations) / len(durations) if durations else 0
def _count_endpoints(self, requests: list) -> dict:
"""統計端點呼叫次數"""
endpoints = {}
for req in requests:
endpoint = req.get('endpoint', 'unknown')
endpoints[endpoint] = endpoints.get(endpoint, 0) + 1
return endpoints
from fastapi import FastAPI, Request
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from app.monitoring import (
MonitoringMiddleware, SystemMonitor, AIModelMonitor,
active_sessions, errors_total
)
from app.logging_config import StructuredLogger, LogAnalyzer
import time
from datetime import datetime
# 初始化日誌
logger = StructuredLogger(__name__, log_file='logs/app.log')
# 初始化 FastAPI
app = FastAPI(title="AI Assistant API")
# 添加監控中介軟體
app.add_middleware(MonitoringMiddleware)
# ═══════════════════════════════════════════════════════════
# 監控端點
# ═══════════════════════════════════════════════════════════
@app.get("/metrics")
async def metrics():
"""Prometheus 指標端點"""
# 更新系統指標
SystemMonitor.update_system_metrics()
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
@app.get("/health/detailed")
async def detailed_health():
"""詳細健康檢查"""
system_info = SystemMonitor.get_system_info()
health_status = {
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'system': system_info,
'uptime': time.time() - app.state.start_time,
'active_sessions': active_sessions._value.get()
}
logger.info("Health check performed", **health_status)
return health_status
@app.get("/stats")
async def statistics():
"""應用統計資訊"""
log_analyzer = LogAnalyzer('logs/app.log')
stats = {
'timestamp': datetime.utcnow().isoformat(),
'errors': log_analyzer.analyze_errors(time_range=3600),
'requests': log_analyzer.get_request_stats(time_range=3600),
'system': SystemMonitor.get_system_info(),
'model': AIModelMonitor.get_model_stats()
}
return stats
# ═══════════════════════════════════════════════════════════
# 包裝 AI 呼叫以進行監控
# ═══════════════════════════════════════════════════════════
async def monitored_ai_call(prompt: str, model_name: str = "gemini-2.0-flash-exp"):
"""帶監控的 AI 呼叫"""
start_time = time.time()
status = 'success'
try:
model = genai.GenerativeModel(model_name)
response = model.generate_content(prompt)
# 估算 token 使用(簡化)
tokens = {
'prompt': len(prompt.split()) * 1.3,
'completion': len(response.text.split()) * 1.3
}
logger.info(
"AI model call successful",
model=model_name,
prompt_length=len(prompt),
response_length=len(response.text),
tokens=tokens
)
return response.text
except Exception as e:
status = 'error'
logger.error(
"AI model call failed",
model=model_name,
error=str(e),
error_type=type(e).__name__
)
errors_total.labels(
error_type=type(e).__name__,
endpoint='ai_model'
).inc()
raise
finally:
duration = time.time() - start_time
AIModelMonitor.track_model_call(
model_name=model_name,
duration=duration,
status=status,
tokens=tokens if status == 'success' else None
)
@app.on_event("startup")
async def startup_event():
"""應用啟動事件"""
app.state.start_time = time.time()
logger.info("Application started")
@app.on_event("shutdown")
async def shutdown_event():
"""應用關閉事件"""
uptime = time.time() - app.state.start_time
logger.info("Application shutdown", uptime=uptime)
{
"dashboard": {
"title": "AI Assistant Monitoring",
"panels": [
{
"title": "Request Rate",
"targets": [
{
"expr": "rate(http_requests_total[5m])"
}
],
"type": "graph"
},
{
"title": "Response Time (95th percentile)",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"
}
],
"type": "graph"
},
{
"title": "Error Rate",
"targets": [
{
"expr": "rate(errors_total[5m])"
}
],
"type": "graph"
},
{
"title": "AI Model Response Time",
"targets": [
{
"expr": "rate(ai_model_duration_seconds_sum[5m]) / rate(ai_model_duration_seconds_count[5m])"
}
],
"type": "graph"
},
{
"title": "System CPU Usage",
"targets": [
{
"expr": "system_cpu_usage_percent"
}
],
"type": "gauge"
},
{
"title": "System Memory Usage",
"targets": [
{
"expr": "system_memory_usage_percent"
}
],
"type": "gauge"
}
]
}
}
version: '3.8'
services:
# 主應用
ai-assistant:
build: .
ports:
- "8000:8000"
volumes:
- ./logs:/app/logs
environment:
- GEMINI_API_KEY=${GEMINI_API_KEY}
# Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
depends_on:
- ai-assistant
# Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/dashboard.json:/etc/grafana/provisioning/dashboards/dashboard.json
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
depends_on:
- prometheus
# Loki (日誌聚合)
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
volumes:
- loki-data:/loki
command: -config.file=/etc/loki/local-config.yaml
# Promtail (日誌收集)
promtail:
image: grafana/promtail:latest
volumes:
- ./logs:/var/log
- ./promtail/promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
depends_on:
- loki
volumes:
prometheus-data:
grafana-data:
loki-data:
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ai-assistant'
static_configs:
- targets: ['ai-assistant:8000']
metrics_path: '/metrics'
scrape_interval: 10s
指標名稱 | 類型 | 說明 |
---|---|---|
http_requests_total | Counter | HTTP 請求總數 |
http_request_duration_seconds | Histogram | 請求回應時間 |
ai_model_calls_total | Counter | AI 模型呼叫次數 |
ai_model_duration_seconds | Histogram | AI 模型回應時間 |
ai_tokens_used_total | Counter | Token 使用量 |
errors_total | Counter | 錯誤總數 |
active_sessions | Gauge | 活躍會話數 |
指標名稱 | 類型 | 說明 |
---|---|---|
system_cpu_usage_percent | Gauge | CPU 使用率 |
system_memory_usage_percent | Gauge | 記憶體使用率 |
system_disk_usage_percent | Gauge | 磁碟使用率 |
groups:
- name: ai_assistant_alerts
rules:
# 高錯誤率告警
- alert: HighErrorRate
expr: rate(errors_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} errors/sec"
# 慢回應告警
- alert: SlowResponse
expr: histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m])) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "Slow API responses"
description: "95th percentile response time is {{ $value }}s"
# 高 CPU 使用率
- alert: HighCPUUsage
expr: system_cpu_usage_percent > 80
for: 5m
labels:
severity: warning
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}%"
# 高記憶體使用率
- alert: HighMemoryUsage
expr: system_memory_usage_percent > 85
for: 5m
labels:
severity: critical
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value }}%"
# 啟動監控堆疊
docker-compose -f docker-compose.monitoring.yml up -d
# 查看 Prometheus 指標
curl http://localhost:8000/metrics
# 查看應用統計
curl http://localhost:8000/stats | jq
# 查看 Grafana 儀表板
open http://localhost:3000
# 預設登入: admin/admin
# 查看 Prometheus UI
open http://localhost:9090
# 查看日誌
tail -f logs/app.log | jq
# 分析錯誤日誌
grep ERROR logs/app.log | jq
# 監控系統資源
watch -n 1 'curl -s http://localhost:8000/health/detailed | jq'
基於監控數據的優化策略:
# 如果 95th percentile > 2秒
- 實作快取機制
- 優化資料庫查詢
- 使用非同步處理
- 考慮 CDN
# 如果錯誤率 > 1%
- 加強輸入驗證
- 改善錯誤處理
- 增加重試機制
- 檢查第三方服務
# 如果 CPU/記憶體使用率持續 > 80%
- 水平擴展(增加實例)
- 垂直擴展(升級資源)
- 程式碼優化
- 使用快取降低負載
今天我們建立了完整的監控和日誌系統:
✅ Prometheus 指標:全面的效能監控
✅ 結構化日誌:便於分析的日誌格式
✅ Grafana 儀表板:視覺化監控介面
✅ 告警機制:主動發現問題
✅ 日誌分析:深入了解系統行為
明天我們將學習安全性考量與權限管理,確保系統安全可靠!
你的 AI 助理現在有了完整的「健康檢查」系統!📊