iT邦幫忙

2025 iThome 鐵人賽

DAY 25
0

Day 25: 安全性考量與權限管理

今天我們要為 AI 助理建立完整的安全防護!學習如何保護系統免受攻擊、管理使用者權限、以及確保資料安全,讓你的應用可以安心上線。

🔒 為什麼安全性至關重要?

AI 助理處理敏感資訊,安全性不容忽視:

  • 🛡️ 資料保護:使用者資料和 API 金鑰的安全
  • 🚫 防止濫用:限制惡意使用和過度請求
  • 🔐 身份驗證:確保只有授權使用者可存取
  • 📊 審計追蹤:記錄所有安全相關事件
  • ⚖️ 合規要求:符合 GDPR、個資法等規範

🏗 安全架構層次

安全防護架構
├── 網路層安全
│   ├── HTTPS/TLS
│   ├── DDoS 防護
│   └── 防火牆規則
├── 應用層安全
│   ├── 身份驗證 (Authentication)
│   ├── 授權管理 (Authorization)
│   ├── API 金鑰管理
│   └── 速率限制
├── 資料層安全
│   ├── 資料加密
│   ├── 敏感資料遮罩
│   └── 安全儲存
└── 監控與審計
    ├── 安全日誌
    ├── 異常偵測
    └── 合規報告

🔧 核心實作

1. 身份驗證系統 (app/auth.py)

from fastapi import HTTPException, Security, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import jwt
import secrets
import hashlib
from passlib.context import CryptContext
import redis
from pydantic import BaseModel, EmailStr

# 密碼加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 安全配置
SECRET_KEY = os.getenv("SECRET_KEY", secrets.token_urlsafe(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7

# 安全 Headers
security_bearer = HTTPBearer()
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

# Redis 客戶端(用於 token 黑名單和速率限制)
redis_client = redis.Redis(
    host=os.getenv("REDIS_HOST", "localhost"),
    port=int(os.getenv("REDIS_PORT", 6379)),
    decode_responses=True
)

# ═══════════════════════════════════════════════════════════
# 資料模型
# ═══════════════════════════════════════════════════════════

class User(BaseModel):
    """使用者模型"""
    id: str
    email: EmailStr
    username: str
    role: str = "user"  # user, admin, service
    is_active: bool = True
    created_at: datetime
    last_login: Optional[datetime] = None

class TokenData(BaseModel):
    """Token 資料"""
    user_id: str
    email: str
    role: str
    exp: datetime

class LoginRequest(BaseModel):
    """登入請求"""
    email: EmailStr
    password: str

class TokenResponse(BaseModel):
    """Token 回應"""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int

# ═══════════════════════════════════════════════════════════
# 密碼處理
# ═══════════════════════════════════════════════════════════

class PasswordManager:
    """密碼管理器"""
    
    @staticmethod
    def hash_password(password: str) -> str:
        """雜湊密碼"""
        return pwd_context.hash(password)
    
    @staticmethod
    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """驗證密碼"""
        return pwd_context.verify(plain_password, hashed_password)
    
    @staticmethod
    def validate_password_strength(password: str) -> Dict[str, Any]:
        """驗證密碼強度"""
        issues = []
        
        if len(password) < 8:
            issues.append("密碼長度至少 8 個字元")
        
        if not any(c.isupper() for c in password):
            issues.append("至少包含一個大寫字母")
        
        if not any(c.islower() for c in password):
            issues.append("至少包含一個小寫字母")
        
        if not any(c.isdigit() for c in password):
            issues.append("至少包含一個數字")
        
        if not any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in password):
            issues.append("至少包含一個特殊字元")
        
        return {
            "is_valid": len(issues) == 0,
            "issues": issues
        }

# ═══════════════════════════════════════════════════════════
# JWT Token 管理
# ═══════════════════════════════════════════════════════════

class TokenManager:
    """Token 管理器"""
    
    @staticmethod
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
        """創建存取 Token"""
        to_encode = data.copy()
        
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        
        to_encode.update({"exp": expire, "type": "access"})
        
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    @staticmethod
    def create_refresh_token(data: dict) -> str:
        """創建刷新 Token"""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
        
        to_encode.update({"exp": expire, "type": "refresh"})
        
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
        return encoded_jwt
    
    @staticmethod
    def verify_token(token: str, token_type: str = "access") -> TokenData:
        """驗證 Token"""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            
            # 檢查 token 類型
            if payload.get("type") != token_type:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Invalid token type"
                )
            
            # 檢查是否在黑名單中
            if TokenManager.is_token_blacklisted(token):
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Token has been revoked"
                )
            
            return TokenData(
                user_id=payload.get("sub"),
                email=payload.get("email"),
                role=payload.get("role"),
                exp=datetime.fromtimestamp(payload.get("exp"))
            )
            
        except jwt.ExpiredSignatureError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token has expired"
            )
        except jwt.JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Could not validate credentials"
            )
    
    @staticmethod
    def blacklist_token(token: str, expires_in: int = 3600):
        """將 Token 加入黑名單"""
        token_hash = hashlib.sha256(token.encode()).hexdigest()
        redis_client.setex(f"blacklist:{token_hash}", expires_in, "1")
    
    @staticmethod
    def is_token_blacklisted(token: str) -> bool:
        """檢查 Token 是否在黑名單"""
        token_hash = hashlib.sha256(token.encode()).hexdigest()
        return redis_client.exists(f"blacklist:{token_hash}") > 0

# ═══════════════════════════════════════════════════════════
# 認證依賴
# ═══════════════════════════════════════════════════════════

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security_bearer)
) -> User:
    """獲取當前使用者"""
    token = credentials.credentials
    token_data = TokenManager.verify_token(token)
    
    # 從資料庫獲取使用者(這裡簡化)
    user = User(
        id=token_data.user_id,
        email=token_data.email,
        username=token_data.email.split('@')[0],
        role=token_data.role,
        is_active=True,
        created_at=datetime.utcnow()
    )
    
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )
    
    return user

async def get_current_active_admin(
    current_user: User = Depends(get_current_user)
) -> User:
    """獲取當前管理員使用者"""
    if current_user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )
    return current_user

# ═══════════════════════════════════════════════════════════
# API 金鑰驗證
# ═══════════════════════════════════════════════════════════

class APIKeyManager:
    """API 金鑰管理器"""
    
    @staticmethod
    def generate_api_key() -> str:
        """生成 API 金鑰"""
        return f"sk_{secrets.token_urlsafe(32)}"
    
    @staticmethod
    def validate_api_key(api_key: str) -> bool:
        """驗證 API 金鑰"""
        # 從資料庫或 Redis 驗證
        # 這裡簡化為檢查格式
        return api_key.startswith("sk_") and len(api_key) > 35

async def verify_api_key(
    api_key: Optional[str] = Security(api_key_header)
) -> str:
    """驗證 API 金鑰"""
    if not api_key:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="API key required"
        )
    
    if not APIKeyManager.validate_api_key(api_key):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key"
        )
    
    return api_key

2. 速率限制 (app/rate_limit.py)

from fastapi import HTTPException, Request, status
from typing import Optional
import time
import redis
from functools import wraps

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def check_rate_limit(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> bool:
        """檢查速率限制"""
        current_time = int(time.time())
        window_key = f"rate_limit:{key}:{current_time // window_seconds}"
        
        # 獲取當前計數
        current_count = self.redis.get(window_key)
        
        if current_count and int(current_count) >= max_requests:
            return False
        
        # 增加計數
        pipe = self.redis.pipeline()
        pipe.incr(window_key)
        pipe.expire(window_key, window_seconds * 2)  # 設定過期時間
        pipe.execute()
        
        return True
    
    def get_rate_limit_info(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> dict:
        """獲取速率限制資訊"""
        current_time = int(time.time())
        window_key = f"rate_limit:{key}:{current_time // window_seconds}"
        
        current_count = self.redis.get(window_key)
        remaining = max_requests - int(current_count or 0)
        
        return {
            "limit": max_requests,
            "remaining": max(0, remaining),
            "reset": (current_time // window_seconds + 1) * window_seconds
        }

# 初始化速率限制器
rate_limiter = RateLimiter(redis_client)

# 速率限制裝飾器
def rate_limit(max_requests: int = 100, window_seconds: int = 60):
    """速率限制裝飾器"""
    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args, **kwargs):
            # 使用 IP 地址作為鍵
            client_ip = request.client.host
            
            # 如果有使用者資訊,使用使用者 ID
            if hasattr(request.state, "user"):
                rate_key = f"user:{request.state.user.id}"
            else:
                rate_key = f"ip:{client_ip}"
            
            # 檢查速率限制
            if not rate_limiter.check_rate_limit(
                rate_key,
                max_requests,
                window_seconds
            ):
                raise HTTPException(
                    status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                    detail="Rate limit exceeded"
                )
            
            # 添加速率限制資訊到回應 header
            rate_info = rate_limiter.get_rate_limit_info(
                rate_key,
                max_requests,
                window_seconds
            )
            
            response = await func(request, *args, **kwargs)
            
            if hasattr(response, "headers"):
                response.headers["X-RateLimit-Limit"] = str(rate_info["limit"])
                response.headers["X-RateLimit-Remaining"] = str(rate_info["remaining"])
                response.headers["X-RateLimit-Reset"] = str(rate_info["reset"])
            
            return response
        
        return wrapper
    return decorator

3. 輸入驗證與清理 (app/security_utils.py)

import re
from typing import Any, Dict
import bleach
from html import escape

class InputValidator:
    """輸入驗證器"""
    
    @staticmethod
    def sanitize_html(text: str) -> str:
        """清理 HTML"""
        # 只允許安全的標籤
        allowed_tags = ['p', 'br', 'strong', 'em', 'u']
        return bleach.clean(text, tags=allowed_tags, strip=True)
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """驗證電子郵件格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return re.match(pattern, email) is not None
    
    @staticmethod
    def prevent_sql_injection(text: str) -> str:
        """防止 SQL 注入"""
        dangerous_patterns = [
            r"(\bOR\b|\bAND\b).+?=.+?",
            r"(DROP|DELETE|INSERT|UPDATE)\s+",
            r"--",
            r"/\*.*?\*/"
        ]
        
        for pattern in dangerous_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                raise ValueError("Potentially dangerous input detected")
        
        return text
    
    @staticmethod
    def validate_prompt_length(prompt: str, max_length: int = 5000) -> bool:
        """驗證提示詞長度"""
        if len(prompt) > max_length:
            raise ValueError(f"Prompt exceeds maximum length of {max_length}")
        return True
    
    @staticmethod
    def mask_sensitive_data(data: Dict[str, Any]) -> Dict[str, Any]:
        """遮罩敏感資料"""
        masked_data = data.copy()
        sensitive_fields = ['password', 'api_key', 'secret', 'token']
        
        for key in masked_data:
            if any(field in key.lower() for field in sensitive_fields):
                if isinstance(masked_data[key], str) and len(masked_data[key]) > 4:
                    masked_data[key] = masked_data[key][:4] + "*" * (len(masked_data[key]) - 4)
        
        return masked_data

class ContentFilter:
    """內容過濾器"""
    
    # 敏感詞庫(示例)
    BLOCKED_WORDS = [
        # 添加需要過濾的敏感詞
    ]
    
    @staticmethod
    def check_content(text: str) -> Dict[str, Any]:
        """檢查內容是否包含敏感詞"""
        found_words = []
        
        text_lower = text.lower()
        for word in ContentFilter.BLOCKED_WORDS:
            if word.lower() in text_lower:
                found_words.append(word)
        
        return {
            "is_safe": len(found_words) == 0,
            "blocked_words": found_words
        }
    
    @staticmethod
    def filter_ai_response(response: str) -> str:
        """過濾 AI 回應"""
        # 移除可能的敏感資訊
        filtered = response
        
        # 遮罩電子郵件
        filtered = re.sub(
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            '[EMAIL]',
            filtered
        )
        
        # 遮罩電話號碼
        filtered = re.sub(
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            '[PHONE]',
            filtered
        )
        
        return filtered

4. 安全中介軟體 (app/security_middleware.py)

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import time
import hashlib

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """安全 Headers 中介軟體"""
    
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # 添加安全 Headers
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        
        return response

class RequestValidationMiddleware(BaseHTTPMiddleware):
    """請求驗證中介軟體"""
    
    async def dispatch(self, request: Request, call_next):
        # 檢查 Content-Type
        if request.method in ["POST", "PUT", "PATCH"]:
            content_type = request.headers.get("content-type", "")
            
            if not content_type.startswith("application/json"):
                return JSONResponse(
                    status_code=415,
                    content={"detail": "Unsupported media type"}
                )
        
        # 檢查請求大小
        content_length = request.headers.get("content-length")
        if content_length and int(content_length) > 10 * 1024 * 1024:  # 10MB
            return JSONResponse(
                status_code=413,
                content={"detail": "Request entity too large"}
            )
        
        return await call_next(request)

class AuditLogMiddleware(BaseHTTPMiddleware):
    """審計日誌中介軟體"""
    
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # 記錄請求資訊
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "method": request.method,
            "path": request.url.path,
            "client_ip": request.client.host,
            "user_agent": request.headers.get("user-agent")
        }
        
        # 如果有使用者資訊
        if hasattr(request.state, "user"):
            log_data["user_id"] = request.state.user.id
            log_data["user_email"] = request.state.user.email
        
        response = await call_next(request)
        
        # 記錄回應資訊
        log_data["status_code"] = response.status_code
        log_data["duration"] = time.time() - start_time
        
        # 記錄到安全日誌
        logger.info("Audit log", extra=log_data)
        
        return response

5. 整合到 FastAPI (app/main.py 更新)

from fastapi import FastAPI, Depends, HTTPException, Request
from app.auth import (
    get_current_user, get_current_active_admin, verify_api_key,
    TokenManager, PasswordManager, User
)
from app.rate_limit import rate_limit
from app.security_utils import InputValidator, ContentFilter
from app.security_middleware import (
    SecurityHeadersMiddleware,
    RequestValidationMiddleware,
    AuditLogMiddleware
)

app = FastAPI()

# 添加安全中介軟體
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(RequestValidationMiddleware)
app.add_middleware(AuditLogMiddleware)

# ═══════════════════════════════════════════════════════════
# 認證端點
# ═══════════════════════════════════════════════════════════

@app.post("/auth/login")
@rate_limit(max_requests=5, window_seconds=300)  # 5次/5分鐘
async def login(request: Request, login_data: LoginRequest):
    """使用者登入"""
    # 驗證憑證(這裡簡化)
    # 實際應該查詢資料庫
    
    # 創建 tokens
    access_token = TokenManager.create_access_token(
        data={
            "sub": "user123",
            "email": login_data.email,
            "role": "user"
        }
    )
    
    refresh_token = TokenManager.create_refresh_token(
        data={"sub": "user123"}
    )
    
    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )

@app.post("/auth/logout")
async def logout(current_user: User = Depends(get_current_user)):
    """使用者登出"""
    # 將 token 加入黑名單
    # 實際應該從 header 獲取 token
    return {"message": "Successfully logged out"}

@app.get("/auth/me")
async def get_me(current_user: User = Depends(get_current_user)):
    """獲取當前使用者資訊"""
    return current_user

# ═══════════════════════════════════════════════════════════
# 受保護的 API 端點
# ═══════════════════════════════════════════════════════════

@app.post("/chat/secure")
@rate_limit(max_requests=60, window_seconds=60)
async def secure_chat(
    request: Request,
    message: str,
    current_user: User = Depends(get_current_user)
):
    """受保護的聊天端點"""
    # 驗證輸入
    InputValidator.validate_prompt_length(message)
    
    # 檢查內容
    content_check = ContentFilter.check_content(message)
    if not content_check["is_safe"]:
        raise HTTPException(
            status_code=400,
            detail="Content contains blocked words"
        )
    
    # 呼叫 AI(帶監控)
    response = await monitored_ai_call(message)
    
    # 過濾回應
    filtered_response = ContentFilter.filter_ai_response(response)
    
    return {
        "response": filtered_response,
        "user_id": current_user.id
    }

@app.post("/admin/users", dependencies=[Depends(get_current_active_admin)])
async def create_user(user_data: dict):
    """管理員:創建使用者"""
    # 驗證密碼強度
    password_check = PasswordManager.validate_password_strength(
        user_data["password"]
    )
    
    if not password_check["is_valid"]:
        raise HTTPException(
            status_code=400,
            detail={"message": "Weak password", "issues": password_check["issues"]}
        )
    
    # 創建使用者(簡化)
    return {"message": "User created successfully"}

@app.get("/admin/audit-logs", dependencies=[Depends(get_current_active_admin)])
async def get_audit_logs(skip: int = 0, limit: int = 100):
    """管理員:查看審計日誌"""
    # 從日誌系統獲取審計記錄
    return {"logs": [], "total": 0}

# ═══════════════════════════════════════════════════════════
# API 金鑰保護的端點
# ═══════════════════════════════════════════════════════════

@app.post("/api/chat")
@rate_limit(max_requests=100, window_seconds=60)
async def api_chat(
    request: Request,
    message: str,
    api_key: str = Depends(verify_api_key)
):
    """API 金鑰保護的聊天端點"""
    # API 金鑰已在依賴中驗證
    response = await monitored_ai_call(message)
    return {"response": response}

🔐 安全檢查清單

部署前的安全檢查:

基礎安全

  • [ ] ✅ HTTPS/TLS 已啟用
  • [ ] ✅ 安全 Headers 已設定
  • [ ] ✅ CORS 政策已正確配置
  • [ ] ✅ API 金鑰使用環境變數
  • [ ] ✅ 密碼已加密儲存

認證與授權

  • [ ] ✅ JWT Token 已實作
  • [ ] ✅ Token 過期時間合理
  • [ ] ✅ 登出機制可用
  • [ ] ✅ 權限分級清晰
  • [ ] ✅ 密碼強度要求

輸入驗證

  • [ ] ✅ 所有輸入已驗證
  • [ ] ✅ SQL 注入防護
  • [ ] ✅ XSS 攻擊防護
  • [ ] ✅ 請求大小限制
  • [ ] ✅ 敏感詞過濾

速率限制

  • [ ] ✅ API 速率限制
  • [ ] ✅ 登入嘗試限制
  • [ ] ✅ IP 黑名單機制
  • [ ] ✅ DDoS 防護

監控與審計

  • [ ] ✅ 安全日誌記錄
  • [ ] ✅ 異常行為偵測
  • [ ] ✅ 審計追蹤完整
  • [ ] ✅ 告警機制運作

📋 安全最佳實踐

1. 密碼管理

# ✅ 好的做法
- 使用 bcrypt 或 argon2 雜湊
- 要求強密碼(長度、複雜度)
- 實作密碼重置機制
- 定期要求更改密碼

# ❌ 避免的做法
- 明文儲存密碼
- 使用 MD5 或 SHA1
- 允許弱密碼
- 在日誌中記錄密碼

2. API 金鑰管理

# ✅ 好的做法
- 使用環境變數
- 定期輪換金鑰
- 限制金鑰權限
- 記錄金鑰使用情況

# ❌ 避免的做法
- 硬編碼在程式碼中
- 提交到版本控制
- 在 URL 中傳遞
- 共用同一金鑰

3. 資料保護

# ✅ 好的做法
- 傳輸層加密 (HTTPS)
- 靜態資料加密
- 敏感資料遮罩
- 定期備份

# ❌ 避免的做法
- HTTP 明文傳輸
- 明文儲存敏感資料
- 在日誌中記錄敏感資訊
- 不加密的備份

🛡️ 常見攻擊防護

1. SQL 注入防護

# ❌ 危險的做法
query = f"SELECT * FROM users WHERE email = '{user_email}'"

# ✅ 安全的做法
query = "SELECT * FROM users WHERE email = %s"
cursor.execute(query, (user_email,))

2. XSS 攻擊防護

from html import escape

# ✅ 安全的做法
def safe_output(user_input: str) -> str:
    """清理使用者輸入以防止 XSS"""
    return escape(user_input)

# 或使用 bleach 庫
import bleach

def sanitize_html(html_content: str) -> str:
    """清理 HTML 內容"""
    allowed_tags = ['p', 'br', 'strong', 'em']
    return bleach.clean(html_content, tags=allowed_tags, strip=True)

3. CSRF 防護

from fastapi import Request, HTTPException
import secrets

class CSRFProtection:
    """CSRF 保護"""
    
    @staticmethod
    def generate_csrf_token() -> str:
        """生成 CSRF Token"""
        return secrets.token_urlsafe(32)
    
    @staticmethod
    def validate_csrf_token(request: Request, token: str) -> bool:
        """驗證 CSRF Token"""
        session_token = request.session.get("csrf_token")
        return session_token == token

# 在端點使用
@app.post("/sensitive-action")
async def sensitive_action(
    request: Request,
    csrf_token: str,
    current_user: User = Depends(get_current_user)
):
    if not CSRFProtection.validate_csrf_token(request, csrf_token):
        raise HTTPException(status_code=403, detail="Invalid CSRF token")
    
    # 執行操作
    return {"message": "Action completed"}

4. 點擊劫持防護

# 在回應 headers 中設定
response.headers["X-Frame-Options"] = "DENY"
response.headers["Content-Security-Policy"] = "frame-ancestors 'none'"

🔍 安全測試工具

1. 自動化安全掃描

# 使用 Bandit 進行 Python 安全掃描
pip install bandit
bandit -r app/

# 使用 Safety 檢查依賴漏洞
pip install safety
safety check

# 使用 OWASP ZAP 進行滲透測試
docker run -t owasp/zap2docker-stable zap-baseline.py \
    -t http://your-api-url

2. 手動測試腳本

import requests
import json

class SecurityTester:
    """安全測試工具"""
    
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    def test_sql_injection(self):
        """測試 SQL 注入"""
        payloads = [
            "' OR '1'='1",
            "'; DROP TABLE users; --",
            "1' UNION SELECT NULL--"
        ]
        
        for payload in payloads:
            response = requests.post(
                f"{self.base_url}/chat",
                json={"message": payload}
            )
            
            if response.status_code == 500:
                print(f"⚠️ 可能存在 SQL 注入漏洞: {payload}")
    
    def test_xss(self):
        """測試 XSS"""
        payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')"
        ]
        
        for payload in payloads:
            response = requests.post(
                f"{self.base_url}/chat",
                json={"message": payload}
            )
            
            if payload in response.text:
                print(f"⚠️ 可能存在 XSS 漏洞: {payload}")
    
    def test_rate_limiting(self):
        """測試速率限制"""
        print("測試速率限制...")
        
        for i in range(150):
            response = requests.get(f"{self.base_url}/health")
            
            if response.status_code == 429:
                print(f"✅ 速率限制在 {i+1} 次請求後觸發")
                return
        
        print("⚠️ 速率限制可能未正確配置")
    
    def test_authentication(self):
        """測試認證機制"""
        # 測試未認證存取
        response = requests.post(
            f"{self.base_url}/chat/secure",
            json={"message": "test"}
        )
        
        if response.status_code != 401:
            print("⚠️ 認證機制可能未正確實作")
        else:
            print("✅ 認證保護正常")

# 使用範例
tester = SecurityTester("http://localhost:8000")
tester.test_sql_injection()
tester.test_xss()
tester.test_rate_limiting()
tester.test_authentication()

📊 合規性考量

GDPR(歐盟一般資料保護規範)

class GDPRCompliance:
    """GDPR 合規性工具"""
    
    @staticmethod
    def export_user_data(user_id: str) -> Dict:
        """使用者資料匯出(資料攜帶權)"""
        user_data = {
            "personal_info": {},
            "conversations": [],
            "api_usage": [],
            "preferences": {}
        }
        # 從資料庫收集所有使用者資料
        return user_data
    
    @staticmethod
    def delete_user_data(user_id: str, retain_logs: bool = True):
        """刪除使用者資料(被遺忘權)"""
        # 刪除個人識別資訊
        # 可選保留匿名化的日誌用於審計
        pass
    
    @staticmethod
    def anonymize_data(data: Dict) -> Dict:
        """資料匿名化"""
        anonymized = data.copy()
        # 移除或雜湊個人識別資訊
        if "email" in anonymized:
            anonymized["email"] = hashlib.sha256(
                anonymized["email"].encode()
            ).hexdigest()
        return anonymized

個資法(台灣)

class PersonalDataProtection:
    """個資保護工具"""
    
    @staticmethod
    def get_consent_status(user_id: str) -> Dict:
        """獲取使用者同意狀態"""
        return {
            "data_collection": True,
            "data_usage": True,
            "marketing": False,
            "third_party_sharing": False
        }
    
    @staticmethod
    def record_consent(user_id: str, consent_type: str, granted: bool):
        """記錄使用者同意"""
        consent_record = {
            "user_id": user_id,
            "consent_type": consent_type,
            "granted": granted,
            "timestamp": datetime.utcnow(),
            "ip_address": "x.x.x.x"
        }
        # 儲存到資料庫
        pass

🔐 環境變數安全管理

使用 .env 檔案

# .env(不要提交到版本控制)
SECRET_KEY=your-secret-key-here
GEMINI_API_KEY=your-gemini-api-key
DATABASE_URL=postgresql://user:pass@localhost/db
REDIS_URL=redis://localhost:6379
ALLOWED_ORIGINS=https://yourdomain.com

# 加密敏感設定
ENCRYPTION_KEY=your-encryption-key

使用密鑰管理服務

# AWS Secrets Manager
import boto3
from botocore.exceptions import ClientError

def get_secret(secret_name: str) -> dict:
    """從 AWS Secrets Manager 獲取密鑰"""
    client = boto3.client('secretsmanager', region_name='ap-northeast-1')
    
    try:
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except ClientError as e:
        raise e

# Google Secret Manager
from google.cloud import secretmanager

def get_secret_gcp(project_id: str, secret_id: str) -> str:
    """從 GCP Secret Manager 獲取密鑰"""
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/{project_id}/secrets/{secret_id}/versions/latest"
    
    response = client.access_secret_version(request={"name": name})
    return response.payload.data.decode('UTF-8')

📱 安全監控儀表板

from fastapi import APIRouter
from datetime import datetime, timedelta

security_router = APIRouter(prefix="/security")

@security_router.get("/dashboard")
async def security_dashboard(
    current_user: User = Depends(get_current_active_admin)
):
    """安全監控儀表板"""
    
    # 獲取安全指標
    now = datetime.utcnow()
    last_24h = now - timedelta(hours=24)
    
    dashboard_data = {
        "failed_login_attempts": {
            "last_24h": 15,
            "last_hour": 3,
            "top_ips": [
                {"ip": "1.2.3.4", "count": 8},
                {"ip": "5.6.7.8", "count": 5}
            ]
        },
        "rate_limit_violations": {
            "last_24h": 42,
            "by_endpoint": {
                "/chat": 30,
                "/api/chat": 12
            }
        },
        "blocked_requests": {
            "sql_injection_attempts": 5,
            "xss_attempts": 3,
            "invalid_tokens": 25
        },
        "active_sessions": {
            "total": 150,
            "by_role": {
                "user": 140,
                "admin": 8,
                "service": 2
            }
        },
        "api_key_usage": {
            "active_keys": 45,
            "revoked_keys": 3,
            "expired_keys": 2
        },
        "security_alerts": [
            {
                "severity": "high",
                "type": "Multiple failed login attempts",
                "ip": "1.2.3.4",
                "timestamp": "2024-01-15T10:30:00Z"
            }
        ]
    }
    
    return dashboard_data

🎯 今日總結

今天我們建立了完整的安全防護系統:

身份驗證:JWT Token、API 金鑰、多因素認證
授權管理:角色權限、資源存取控制
輸入驗證:SQL 注入、XSS、CSRF 防護
速率限制:防止濫用和 DDoS 攻擊
安全監控:審計日誌、異常偵測、告警機制
合規性:GDPR、個資法要求實作

💡 安全金句

安全不是一次性的工作,而是持續的過程

  1. 假設被入侵:設計時假設系統會被攻擊
  2. 最小權限原則:只給予必要的最小權限
  3. 縱深防禦:多層防護,不依賴單一防線
  4. 定期審查:持續檢視和更新安全策略
  5. 快速響應:建立事件應變計劃

🚨 安全事件應變計劃

## 安全事件應變流程

### 1. 發現階段
- 監控系統發現異常
- 使用者回報可疑活動
- 自動告警觸發

### 2. 評估階段
- 確認事件類型和嚴重性
- 評估影響範圍
- 識別受影響的系統和資料

### 3. 隔離階段
- 隔離受影響的系統
- 封鎖惡意 IP 地址
- 撤銷受損的憑證

### 4. 根除階段
- 移除惡意程式碼
- 修補安全漏洞
- 更新安全規則

### 5. 恢復階段
- 從備份恢復資料
- 重新啟動服務
- 監控系統穩定性

### 6. 檢討階段
- 分析事件原因
- 記錄經驗教訓
- 更新安全策略

### 聯絡資訊
- 安全團隊:security@company.com
- 緊急熱線:+886-XXX-XXXX
- 事件報告系統:https://security.company.com

📚 延伸閱讀

推薦資源:

安全工具:

  • Burp Suite(Web 安全測試)
  • OWASP ZAP(滲透測試)
  • Snyk(依賴漏洞掃描)
  • SonarQube(程式碼安全分析)

明天我們將學習成本控制與資源優化,讓系統既安全又經濟!💰


記住:安全是每個人的責任,不要等到被攻擊才開始重視! 🛡️


上一篇
Day 24: 效能監控與日誌分析
下一篇
Day 26: 成本控制與資源優化
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言