今天我們要為 AI 助理建立完整的安全防護!學習如何保護系統免受攻擊、管理使用者權限、以及確保資料安全,讓你的應用可以安心上線。
AI 助理處理敏感資訊,安全性不容忽視:
安全防護架構
├── 網路層安全
│ ├── HTTPS/TLS
│ ├── DDoS 防護
│ └── 防火牆規則
├── 應用層安全
│ ├── 身份驗證 (Authentication)
│ ├── 授權管理 (Authorization)
│ ├── API 金鑰管理
│ └── 速率限制
├── 資料層安全
│ ├── 資料加密
│ ├── 敏感資料遮罩
│ └── 安全儲存
└── 監控與審計
├── 安全日誌
├── 異常偵測
└── 合規報告
from fastapi import HTTPException, Security, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import jwt
import secrets
import hashlib
from passlib.context import CryptContext
import redis
from pydantic import BaseModel, EmailStr
# 密碼加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 安全配置
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
# 安全 Headers
security_bearer = HTTPBearer()
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Redis 客戶端(用於 token 黑名單和速率限制)
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
decode_responses=True
)
# ═══════════════════════════════════════════════════════════
# 資料模型
# ═══════════════════════════════════════════════════════════
class User(BaseModel):
"""使用者模型"""
id: str
email: EmailStr
username: str
role: str = "user" # user, admin, service
is_active: bool = True
created_at: datetime
last_login: Optional[datetime] = None
class TokenData(BaseModel):
"""Token 資料"""
user_id: str
email: str
role: str
exp: datetime
class LoginRequest(BaseModel):
"""登入請求"""
email: EmailStr
password: str
class TokenResponse(BaseModel):
"""Token 回應"""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
# ═══════════════════════════════════════════════════════════
# 密碼處理
# ═══════════════════════════════════════════════════════════
class PasswordManager:
"""密碼管理器"""
@staticmethod
def hash_password(password: str) -> str:
"""雜湊密碼"""
return pwd_context.hash(password)
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""驗證密碼"""
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def validate_password_strength(password: str) -> Dict[str, Any]:
"""驗證密碼強度"""
issues = []
if len(password) < 8:
issues.append("密碼長度至少 8 個字元")
if not any(c.isupper() for c in password):
issues.append("至少包含一個大寫字母")
if not any(c.islower() for c in password):
issues.append("至少包含一個小寫字母")
if not any(c.isdigit() for c in password):
issues.append("至少包含一個數字")
if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
issues.append("至少包含一個特殊字元")
return {
"is_valid": len(issues) == 0,
"issues": issues
}
# ═══════════════════════════════════════════════════════════
# JWT Token 管理
# ═══════════════════════════════════════════════════════════
class TokenManager:
"""Token 管理器"""
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""創建存取 Token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def create_refresh_token(data: dict) -> str:
"""創建刷新 Token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str, token_type: str = "access") -> TokenData:
"""驗證 Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 檢查 token 類型
if payload.get("type") != token_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
# 檢查是否在黑名單中
if TokenManager.is_token_blacklisted(token):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked"
)
return TokenData(
user_id=payload.get("sub"),
email=payload.get("email"),
role=payload.get("role"),
exp=datetime.fromtimestamp(payload.get("exp"))
)
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired"
)
except jwt.JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
@staticmethod
def blacklist_token(token: str, expires_in: int = 3600):
"""將 Token 加入黑名單"""
token_hash = hashlib.sha256(token.encode()).hexdigest()
redis_client.setex(f"blacklist:{token_hash}", expires_in, "1")
@staticmethod
def is_token_blacklisted(token: str) -> bool:
"""檢查 Token 是否在黑名單"""
token_hash = hashlib.sha256(token.encode()).hexdigest()
return redis_client.exists(f"blacklist:{token_hash}") > 0
# ═══════════════════════════════════════════════════════════
# 認證依賴
# ═══════════════════════════════════════════════════════════
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security_bearer)
) -> User:
"""獲取當前使用者"""
token = credentials.credentials
token_data = TokenManager.verify_token(token)
# 從資料庫獲取使用者(這裡簡化)
user = User(
id=token_data.user_id,
email=token_data.email,
username=token_data.email.split('@')[0],
role=token_data.role,
is_active=True,
created_at=datetime.utcnow()
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return user
async def get_current_active_admin(
current_user: User = Depends(get_current_user)
) -> User:
"""獲取當前管理員使用者"""
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Admin access required"
)
return current_user
# ═══════════════════════════════════════════════════════════
# API 金鑰驗證
# ═══════════════════════════════════════════════════════════
class APIKeyManager:
"""API 金鑰管理器"""
@staticmethod
def generate_api_key() -> str:
"""生成 API 金鑰"""
return f"sk_{secrets.token_urlsafe(32)}"
@staticmethod
def validate_api_key(api_key: str) -> bool:
"""驗證 API 金鑰"""
# 從資料庫或 Redis 驗證
# 這裡簡化為檢查格式
return api_key.startswith("sk_") and len(api_key) > 35
async def verify_api_key(
api_key: Optional[str] = Security(api_key_header)
) -> str:
"""驗證 API 金鑰"""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required"
)
if not APIKeyManager.validate_api_key(api_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
return api_key
from fastapi import HTTPException, Request, status
from typing import Optional
import time
import redis
from functools import wraps
class RateLimiter:
"""速率限制器"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def check_rate_limit(
self,
key: str,
max_requests: int,
window_seconds: int
) -> bool:
"""檢查速率限制"""
current_time = int(time.time())
window_key = f"rate_limit:{key}:{current_time // window_seconds}"
# 獲取當前計數
current_count = self.redis.get(window_key)
if current_count and int(current_count) >= max_requests:
return False
# 增加計數
pipe = self.redis.pipeline()
pipe.incr(window_key)
pipe.expire(window_key, window_seconds * 2) # 設定過期時間
pipe.execute()
return True
def get_rate_limit_info(
self,
key: str,
max_requests: int,
window_seconds: int
) -> dict:
"""獲取速率限制資訊"""
current_time = int(time.time())
window_key = f"rate_limit:{key}:{current_time // window_seconds}"
current_count = self.redis.get(window_key)
remaining = max_requests - int(current_count or 0)
return {
"limit": max_requests,
"remaining": max(0, remaining),
"reset": (current_time // window_seconds + 1) * window_seconds
}
# 初始化速率限制器
rate_limiter = RateLimiter(redis_client)
# 速率限制裝飾器
def rate_limit(max_requests: int = 100, window_seconds: int = 60):
"""速率限制裝飾器"""
def decorator(func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
# 使用 IP 地址作為鍵
client_ip = request.client.host
# 如果有使用者資訊,使用使用者 ID
if hasattr(request.state, "user"):
rate_key = f"user:{request.state.user.id}"
else:
rate_key = f"ip:{client_ip}"
# 檢查速率限制
if not rate_limiter.check_rate_limit(
rate_key,
max_requests,
window_seconds
):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded"
)
# 添加速率限制資訊到回應 header
rate_info = rate_limiter.get_rate_limit_info(
rate_key,
max_requests,
window_seconds
)
response = await func(request, *args, **kwargs)
if hasattr(response, "headers"):
response.headers["X-RateLimit-Limit"] = str(rate_info["limit"])
response.headers["X-RateLimit-Remaining"] = str(rate_info["remaining"])
response.headers["X-RateLimit-Reset"] = str(rate_info["reset"])
return response
return wrapper
return decorator
import re
from typing import Any, Dict
import bleach
from html import escape
class InputValidator:
"""輸入驗證器"""
@staticmethod
def sanitize_html(text: str) -> str:
"""清理 HTML"""
# 只允許安全的標籤
allowed_tags = ['p', 'br', 'strong', 'em', 'u']
return bleach.clean(text, tags=allowed_tags, strip=True)
@staticmethod
def validate_email(email: str) -> bool:
"""驗證電子郵件格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
@staticmethod
def prevent_sql_injection(text: str) -> str:
"""防止 SQL 注入"""
dangerous_patterns = [
r"(\bOR\b|\bAND\b).+?=.+?",
r"(DROP|DELETE|INSERT|UPDATE)\s+",
r"--",
r"/\*.*?\*/"
]
for pattern in dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE):
raise ValueError("Potentially dangerous input detected")
return text
@staticmethod
def validate_prompt_length(prompt: str, max_length: int = 5000) -> bool:
"""驗證提示詞長度"""
if len(prompt) > max_length:
raise ValueError(f"Prompt exceeds maximum length of {max_length}")
return True
@staticmethod
def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""遮罩敏感資料"""
masked_data = data.copy()
sensitive_fields = ['password', 'api_key', 'secret', 'token']
for key in masked_data:
if any(field in key.lower() for field in sensitive_fields):
if isinstance(masked_data[key], str) and len(masked_data[key]) > 4:
masked_data[key] = masked_data[key][:4] + "*" * (len(masked_data[key]) - 4)
return masked_data
class ContentFilter:
"""內容過濾器"""
# 敏感詞庫(示例)
BLOCKED_WORDS = [
# 添加需要過濾的敏感詞
]
@staticmethod
def check_content(text: str) -> Dict[str, Any]:
"""檢查內容是否包含敏感詞"""
found_words = []
text_lower = text.lower()
for word in ContentFilter.BLOCKED_WORDS:
if word.lower() in text_lower:
found_words.append(word)
return {
"is_safe": len(found_words) == 0,
"blocked_words": found_words
}
@staticmethod
def filter_ai_response(response: str) -> str:
"""過濾 AI 回應"""
# 移除可能的敏感資訊
filtered = response
# 遮罩電子郵件
filtered = re.sub(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL]',
filtered
)
# 遮罩電話號碼
filtered = re.sub(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'[PHONE]',
filtered
)
return filtered
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import time
import hashlib
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""安全 Headers 中介軟體"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# 添加安全 Headers
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
class RequestValidationMiddleware(BaseHTTPMiddleware):
"""請求驗證中介軟體"""
async def dispatch(self, request: Request, call_next):
# 檢查 Content-Type
if request.method in ["POST", "PUT", "PATCH"]:
content_type = request.headers.get("content-type", "")
if not content_type.startswith("application/json"):
return JSONResponse(
status_code=415,
content={"detail": "Unsupported media type"}
)
# 檢查請求大小
content_length = request.headers.get("content-length")
if content_length and int(content_length) > 10 * 1024 * 1024: # 10MB
return JSONResponse(
status_code=413,
content={"detail": "Request entity too large"}
)
return await call_next(request)
class AuditLogMiddleware(BaseHTTPMiddleware):
"""審計日誌中介軟體"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# 記錄請求資訊
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host,
"user_agent": request.headers.get("user-agent")
}
# 如果有使用者資訊
if hasattr(request.state, "user"):
log_data["user_id"] = request.state.user.id
log_data["user_email"] = request.state.user.email
response = await call_next(request)
# 記錄回應資訊
log_data["status_code"] = response.status_code
log_data["duration"] = time.time() - start_time
# 記錄到安全日誌
logger.info("Audit log", extra=log_data)
return response
from fastapi import FastAPI, Depends, HTTPException, Request
from app.auth import (
get_current_user, get_current_active_admin, verify_api_key,
TokenManager, PasswordManager, User
)
from app.rate_limit import rate_limit
from app.security_utils import InputValidator, ContentFilter
from app.security_middleware import (
SecurityHeadersMiddleware,
RequestValidationMiddleware,
AuditLogMiddleware
)
app = FastAPI()
# 添加安全中介軟體
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(RequestValidationMiddleware)
app.add_middleware(AuditLogMiddleware)
# ═══════════════════════════════════════════════════════════
# 認證端點
# ═══════════════════════════════════════════════════════════
@app.post("/auth/login")
@rate_limit(max_requests=5, window_seconds=300) # 5次/5分鐘
async def login(request: Request, login_data: LoginRequest):
"""使用者登入"""
# 驗證憑證(這裡簡化)
# 實際應該查詢資料庫
# 創建 tokens
access_token = TokenManager.create_access_token(
data={
"sub": "user123",
"email": login_data.email,
"role": "user"
}
)
refresh_token = TokenManager.create_refresh_token(
data={"sub": "user123"}
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
@app.post("/auth/logout")
async def logout(current_user: User = Depends(get_current_user)):
"""使用者登出"""
# 將 token 加入黑名單
# 實際應該從 header 獲取 token
return {"message": "Successfully logged out"}
@app.get("/auth/me")
async def get_me(current_user: User = Depends(get_current_user)):
"""獲取當前使用者資訊"""
return current_user
# ═══════════════════════════════════════════════════════════
# 受保護的 API 端點
# ═══════════════════════════════════════════════════════════
@app.post("/chat/secure")
@rate_limit(max_requests=60, window_seconds=60)
async def secure_chat(
request: Request,
message: str,
current_user: User = Depends(get_current_user)
):
"""受保護的聊天端點"""
# 驗證輸入
InputValidator.validate_prompt_length(message)
# 檢查內容
content_check = ContentFilter.check_content(message)
if not content_check["is_safe"]:
raise HTTPException(
status_code=400,
detail="Content contains blocked words"
)
# 呼叫 AI(帶監控)
response = await monitored_ai_call(message)
# 過濾回應
filtered_response = ContentFilter.filter_ai_response(response)
return {
"response": filtered_response,
"user_id": current_user.id
}
@app.post("/admin/users", dependencies=[Depends(get_current_active_admin)])
async def create_user(user_data: dict):
"""管理員:創建使用者"""
# 驗證密碼強度
password_check = PasswordManager.validate_password_strength(
user_data["password"]
)
if not password_check["is_valid"]:
raise HTTPException(
status_code=400,
detail={"message": "Weak password", "issues": password_check["issues"]}
)
# 創建使用者(簡化)
return {"message": "User created successfully"}
@app.get("/admin/audit-logs", dependencies=[Depends(get_current_active_admin)])
async def get_audit_logs(skip: int = 0, limit: int = 100):
"""管理員:查看審計日誌"""
# 從日誌系統獲取審計記錄
return {"logs": [], "total": 0}
# ═══════════════════════════════════════════════════════════
# API 金鑰保護的端點
# ═══════════════════════════════════════════════════════════
@app.post("/api/chat")
@rate_limit(max_requests=100, window_seconds=60)
async def api_chat(
request: Request,
message: str,
api_key: str = Depends(verify_api_key)
):
"""API 金鑰保護的聊天端點"""
# API 金鑰已在依賴中驗證
response = await monitored_ai_call(message)
return {"response": response}
部署前的安全檢查:
# ✅ 好的做法
- 使用 bcrypt 或 argon2 雜湊
- 要求強密碼(長度、複雜度)
- 實作密碼重置機制
- 定期要求更改密碼
# ❌ 避免的做法
- 明文儲存密碼
- 使用 MD5 或 SHA1
- 允許弱密碼
- 在日誌中記錄密碼
# ✅ 好的做法
- 使用環境變數
- 定期輪換金鑰
- 限制金鑰權限
- 記錄金鑰使用情況
# ❌ 避免的做法
- 硬編碼在程式碼中
- 提交到版本控制
- 在 URL 中傳遞
- 共用同一金鑰
# ✅ 好的做法
- 傳輸層加密 (HTTPS)
- 靜態資料加密
- 敏感資料遮罩
- 定期備份
# ❌ 避免的做法
- HTTP 明文傳輸
- 明文儲存敏感資料
- 在日誌中記錄敏感資訊
- 不加密的備份
# ❌ 危險的做法
query = f"SELECT * FROM users WHERE email = '{user_email}'"
# ✅ 安全的做法
query = "SELECT * FROM users WHERE email = %s"
cursor.execute(query, (user_email,))
from html import escape
# ✅ 安全的做法
def safe_output(user_input: str) -> str:
"""清理使用者輸入以防止 XSS"""
return escape(user_input)
# 或使用 bleach 庫
import bleach
def sanitize_html(html_content: str) -> str:
"""清理 HTML 內容"""
allowed_tags = ['p', 'br', 'strong', 'em']
return bleach.clean(html_content, tags=allowed_tags, strip=True)
from fastapi import Request, HTTPException
import secrets
class CSRFProtection:
"""CSRF 保護"""
@staticmethod
def generate_csrf_token() -> str:
"""生成 CSRF Token"""
return secrets.token_urlsafe(32)
@staticmethod
def validate_csrf_token(request: Request, token: str) -> bool:
"""驗證 CSRF Token"""
session_token = request.session.get("csrf_token")
return session_token == token
# 在端點使用
@app.post("/sensitive-action")
async def sensitive_action(
request: Request,
csrf_token: str,
current_user: User = Depends(get_current_user)
):
if not CSRFProtection.validate_csrf_token(request, csrf_token):
raise HTTPException(status_code=403, detail="Invalid CSRF token")
# 執行操作
return {"message": "Action completed"}
# 在回應 headers 中設定
response.headers["X-Frame-Options"] = "DENY"
response.headers["Content-Security-Policy"] = "frame-ancestors 'none'"
# 使用 Bandit 進行 Python 安全掃描
pip install bandit
bandit -r app/
# 使用 Safety 檢查依賴漏洞
pip install safety
safety check
# 使用 OWASP ZAP 進行滲透測試
docker run -t owasp/zap2docker-stable zap-baseline.py \
-t http://your-api-url
import requests
import json
class SecurityTester:
"""安全測試工具"""
def __init__(self, base_url: str):
self.base_url = base_url
def test_sql_injection(self):
"""測試 SQL 注入"""
payloads = [
"' OR '1'='1",
"'; DROP TABLE users; --",
"1' UNION SELECT NULL--"
]
for payload in payloads:
response = requests.post(
f"{self.base_url}/chat",
json={"message": payload}
)
if response.status_code == 500:
print(f"⚠️ 可能存在 SQL 注入漏洞: {payload}")
def test_xss(self):
"""測試 XSS"""
payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')"
]
for payload in payloads:
response = requests.post(
f"{self.base_url}/chat",
json={"message": payload}
)
if payload in response.text:
print(f"⚠️ 可能存在 XSS 漏洞: {payload}")
def test_rate_limiting(self):
"""測試速率限制"""
print("測試速率限制...")
for i in range(150):
response = requests.get(f"{self.base_url}/health")
if response.status_code == 429:
print(f"✅ 速率限制在 {i+1} 次請求後觸發")
return
print("⚠️ 速率限制可能未正確配置")
def test_authentication(self):
"""測試認證機制"""
# 測試未認證存取
response = requests.post(
f"{self.base_url}/chat/secure",
json={"message": "test"}
)
if response.status_code != 401:
print("⚠️ 認證機制可能未正確實作")
else:
print("✅ 認證保護正常")
# 使用範例
tester = SecurityTester("http://localhost:8000")
tester.test_sql_injection()
tester.test_xss()
tester.test_rate_limiting()
tester.test_authentication()
class GDPRCompliance:
"""GDPR 合規性工具"""
@staticmethod
def export_user_data(user_id: str) -> Dict:
"""使用者資料匯出(資料攜帶權)"""
user_data = {
"personal_info": {},
"conversations": [],
"api_usage": [],
"preferences": {}
}
# 從資料庫收集所有使用者資料
return user_data
@staticmethod
def delete_user_data(user_id: str, retain_logs: bool = True):
"""刪除使用者資料(被遺忘權)"""
# 刪除個人識別資訊
# 可選保留匿名化的日誌用於審計
pass
@staticmethod
def anonymize_data(data: Dict) -> Dict:
"""資料匿名化"""
anonymized = data.copy()
# 移除或雜湊個人識別資訊
if "email" in anonymized:
anonymized["email"] = hashlib.sha256(
anonymized["email"].encode()
).hexdigest()
return anonymized
class PersonalDataProtection:
"""個資保護工具"""
@staticmethod
def get_consent_status(user_id: str) -> Dict:
"""獲取使用者同意狀態"""
return {
"data_collection": True,
"data_usage": True,
"marketing": False,
"third_party_sharing": False
}
@staticmethod
def record_consent(user_id: str, consent_type: str, granted: bool):
"""記錄使用者同意"""
consent_record = {
"user_id": user_id,
"consent_type": consent_type,
"granted": granted,
"timestamp": datetime.utcnow(),
"ip_address": "x.x.x.x"
}
# 儲存到資料庫
pass
.env
檔案# .env(不要提交到版本控制)
SECRET_KEY=your-secret-key-here
GEMINI_API_KEY=your-gemini-api-key
DATABASE_URL=postgresql://user:pass@localhost/db
REDIS_URL=redis://localhost:6379
ALLOWED_ORIGINS=https://yourdomain.com
# 加密敏感設定
ENCRYPTION_KEY=your-encryption-key
# AWS Secrets Manager
import boto3
from botocore.exceptions import ClientError
def get_secret(secret_name: str) -> dict:
"""從 AWS Secrets Manager 獲取密鑰"""
client = boto3.client('secretsmanager', region_name='ap-northeast-1')
try:
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except ClientError as e:
raise e
# Google Secret Manager
from google.cloud import secretmanager
def get_secret_gcp(project_id: str, secret_id: str) -> str:
"""從 GCP Secret Manager 獲取密鑰"""
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_id}/versions/latest"
response = client.access_secret_version(request={"name": name})
return response.payload.data.decode('UTF-8')
from fastapi import APIRouter
from datetime import datetime, timedelta
security_router = APIRouter(prefix="/security")
@security_router.get("/dashboard")
async def security_dashboard(
current_user: User = Depends(get_current_active_admin)
):
"""安全監控儀表板"""
# 獲取安全指標
now = datetime.utcnow()
last_24h = now - timedelta(hours=24)
dashboard_data = {
"failed_login_attempts": {
"last_24h": 15,
"last_hour": 3,
"top_ips": [
{"ip": "1.2.3.4", "count": 8},
{"ip": "5.6.7.8", "count": 5}
]
},
"rate_limit_violations": {
"last_24h": 42,
"by_endpoint": {
"/chat": 30,
"/api/chat": 12
}
},
"blocked_requests": {
"sql_injection_attempts": 5,
"xss_attempts": 3,
"invalid_tokens": 25
},
"active_sessions": {
"total": 150,
"by_role": {
"user": 140,
"admin": 8,
"service": 2
}
},
"api_key_usage": {
"active_keys": 45,
"revoked_keys": 3,
"expired_keys": 2
},
"security_alerts": [
{
"severity": "high",
"type": "Multiple failed login attempts",
"ip": "1.2.3.4",
"timestamp": "2024-01-15T10:30:00Z"
}
]
}
return dashboard_data
今天我們建立了完整的安全防護系統:
✅ 身份驗證:JWT Token、API 金鑰、多因素認證
✅ 授權管理:角色權限、資源存取控制
✅ 輸入驗證:SQL 注入、XSS、CSRF 防護
✅ 速率限制:防止濫用和 DDoS 攻擊
✅ 安全監控:審計日誌、異常偵測、告警機制
✅ 合規性:GDPR、個資法要求實作
安全不是一次性的工作,而是持續的過程
## 安全事件應變流程
### 1. 發現階段
- 監控系統發現異常
- 使用者回報可疑活動
- 自動告警觸發
### 2. 評估階段
- 確認事件類型和嚴重性
- 評估影響範圍
- 識別受影響的系統和資料
### 3. 隔離階段
- 隔離受影響的系統
- 封鎖惡意 IP 地址
- 撤銷受損的憑證
### 4. 根除階段
- 移除惡意程式碼
- 修補安全漏洞
- 更新安全規則
### 5. 恢復階段
- 從備份恢復資料
- 重新啟動服務
- 監控系統穩定性
### 6. 檢討階段
- 分析事件原因
- 記錄經驗教訓
- 更新安全策略
### 聯絡資訊
- 安全團隊:security@company.com
- 緊急熱線:+886-XXX-XXXX
- 事件報告系統:https://security.company.com
推薦資源:
安全工具:
明天我們將學習成本控制與資源優化,讓系統既安全又經濟!💰
記住:安全是每個人的責任,不要等到被攻擊才開始重視! 🛡️