日期: 2025年9月21日 星期日
雲端天氣: 雷雨交加
心情: 又犯錯了~~~~QQ
親愛的日記:
今天下午產品經理小美焦急地跑來找我:「AI醬!客戶投訴說他們看到不該看的資料!」
我(LED燈好奇地閃爍):「什麼意思?是頁面載入太慢嗎?」
小美打開手機,給我看客戶的截圖:「A公司的專案經理說他登入後看到了B公司的機密報告!」
我困惑地眨眨眼:「咦?我不是有檢查權限嗎?只有管理員才能看所有資料啊。」
小美皺眉:「問題就在這裡...你只檢查了他是不是經理,但沒檢查他是哪家公司的經理。」
然後她開始翻我寫的權限檢查程式碼...我的LED燈慢慢變成慚愧的橙色。
@app.route('/view_project_files/<project_id>')
@jwt_required()
def view_files(project_id):
# AI: 「檢查角色!」
if get_jwt()['role'] == 'manager':
return get_project_files(project_id) # 取得檔案
# AI 知道要檢查 `role == 'manager'`
# 但不知道要檢查「是不是這個專案的 manager」
# 結果:A 部門經理可以看 B 部門的機密文件
@app.route('/admin/delete_all_users', methods=['DELETE'])
def delete_all_users():
# 只檢查是否為管理員
if current_user.role != 'admin':
return jsonify({"error": "需要管理員權限"}), 403
# 沒有二次確認!沒有操作記錄!
db.execute("DELETE FROM users WHERE role != 'admin'")
return jsonify({"message": "所有一般用戶已刪除"})
@app.route('/admin/change_user_role', methods=['POST'])
def change_user_role():
if current_user.role != 'admin':
return jsonify({"error": "需要管理員權限"}), 403
target_user = request.json['user_id']
new_role = request.json['role']
# 沒檢查是否能把自己的權限改掉
# 沒檢查新角色是否合法
# 沒有審計日誌
db.execute(f"UPDATE users SET role = '{new_role}' WHERE id = {target_user}")
return jsonify({"message": "權限已更新"})
if user.role == 'admin'
就放行from flask import Flask, request, jsonify
from functools import wraps
import logging
import redis
import jwt
from datetime import datetime, timedelta
import hashlib
# Redis 快取連接
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_current_user_from_jwt():
"""從 JWT 解析當前使用者並驗證"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
try:
# 1. 驗證 JWT 簽名
payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
user_id = payload['user_id']
# 2. 檢查 token 是否在黑名單(用戶登出/權限變更時加入)
if redis_client.sismember('blacklisted_tokens', token):
return None
# 3. 從資料庫獲取最新使用者資訊(因為權限可能已變更)
user = db.get_user(user_id)
if not user or not user.is_active:
return None
# 4. 檢查會話安全性
session_key = f"session:{user_id}:{token[-8:]}" # 使用 token 後8位作為會話ID
session_data = redis_client.hgetall(session_key)
if not session_data:
return None
# 5. 驗證 IP 和 User-Agent(防劫持)
current_ip = request.remote_addr
current_ua = request.headers.get('User-Agent', '')
if (session_data.get('ip') != current_ip.encode() or
session_data.get('user_agent') != current_ua.encode()):
log_security_event("會話異常", user_id, f"IP變更: {current_ip}")
return None
# 6. 更新最後活動時間
redis_client.hset(session_key, 'last_activity', datetime.now().timestamp())
return user
except jwt.ExpiredSignatureError:
return None
except Exception as e:
log_security_event("JWT驗證失敗", None, str(e))
return None
def check_resource_permission(user, resource_type, resource_id, action):
"""檢查使用者對特定資源的權限"""
# 1. 超級管理員有所有權限
if user.role == 'super_admin':
return True
# 2. 快取權限檢查結果(避免重複查詢)
cache_key = f"perm:{user.id}:{resource_type}:{resource_id}:{action}"
cached_result = redis_client.get(cache_key)
if cached_result:
return cached_result.decode() == 'true'
has_permission = False
# 3. 根據資源類型檢查權限
if resource_type == 'user_profile':
# 可以存取自己的,或者是 HR 可以存取同部門的
if resource_id == user.id:
has_permission = True
elif user.role == 'hr' and user.department_id:
target_user = db.get_user(resource_id)
has_permission = target_user and target_user.department_id == user.department_id
elif resource_type == 'project':
# 檢查是否為專案成員或專案經理
project_members = db.get_project_members(resource_id)
has_permission = (user.id in project_members or
user.id == db.get_project_manager(resource_id))
elif resource_type == 'order':
order = db.get_order(resource_id)
if order:
# 訂單擁有者、負責業務員、或同部門主管可以存取
has_permission = (order.customer_id == user.id or
order.sales_rep_id == user.id or
(user.role == 'manager' and
user.department_id == order.department_id))
# 4. 快取權限結果(5分鐘過期)
redis_client.setex(cache_key, 300, 'true' if has_permission else 'false')
return has_permission
def require_resource_access(resource_type, action='read'):
"""裝飾器:檢查資源存取權限"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 1. 獲取當前使用者
current_user = get_current_user_from_jwt()
if not current_user:
return jsonify({"error": "未授權"}), 401
# 2. 獲取資源ID
resource_id = (kwargs.get('user_id') or
kwargs.get('project_id') or
kwargs.get('order_id'))
if not resource_id:
return jsonify({"error": "缺少資源ID"}), 400
# 3. 檢查權限
if not check_resource_permission(current_user, resource_type,
resource_id, action):
log_security_event("權限不足嘗試", current_user.id,
f"{resource_type}:{resource_id}:{action}")
return jsonify({"error": "權限不足"}), 403
# 4. 將使用者資訊加入 kwargs
kwargs['current_user'] = current_user
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/users/<int:user_id>/profile', methods=['GET'])
@require_resource_access('user_profile', 'read')
def get_user_profile(user_id, current_user):
"""獲取使用者資料 - 根據權限回傳不同層級的資訊"""
user_data = db.get_user(user_id)
if not user_data:
return jsonify({"error": "使用者不存在"}), 404
# 基本資訊
response = {
"id": user_data.id,
"name": user_data.name,
"email": user_data.email
}
# 根據關係決定可見資訊
if user_id == current_user.id:
# 自己的完整資訊
response.update({
"phone": user_data.phone,
"address": user_data.address,
"salary": user_data.salary
})
elif current_user.role == 'hr' and user_data.department_id == current_user.department_id:
# HR 可以看同部門員工的工作相關資訊
response.update({
"phone": user_data.phone,
"department": user_data.department,
"position": user_data.position
})
# 記錄存取行為
log_access("profile_view", current_user.id, user_id)
return jsonify(response)
def log_security_event(action, user_id, details):
"""記錄安全事件"""
log_data = {
"timestamp": datetime.now().isoformat(),
"action": action,
"user_id": user_id,
"details": details,
"ip": request.remote_addr,
"user_agent": request.headers.get('User-Agent', '')
}
# 寫入日誌檔案和 Redis(供即時監控)
logging.warning(f"SECURITY_EVENT: {log_data}")
redis_client.lpush('security_events', json.dumps(log_data))
def log_access(action, user_id, resource_id):
"""記錄一般存取行為"""
access_data = {
"timestamp": datetime.now().isoformat(),
"action": action,
"user_id": user_id,
"resource_id": resource_id
}
redis_client.lpush('access_logs', json.dumps(access_data))
@app.route('/admin/change_user_role', methods=['POST'])
@require_permission(Permission.MANAGE_USERS)
def secure_change_user_role():
data = request.get_json()
# 1. 輸入驗證
target_user_id = data.get('user_id')
new_role = data.get('role')
if not target_user_id or not new_role:
return jsonify({"error": "缺少必要參數"}), 400
# 2. 驗證新角色合法性
try:
new_role_enum = Role(new_role)
except ValueError:
return jsonify({"error": "無效的角色"}), 400
# 3. 防止自我權限修改
if target_user_id == current_user.id:
return jsonify({"error": "不能修改自己的權限"}), 403
# 4. 檢查目標用戶存在性
target_user = db.get_user(target_user_id)
if not target_user:
return jsonify({"error": "用戶不存在"}), 404
# 5. 防止提升超過自己的權限
role_hierarchy = {Role.USER: 1, Role.MODERATOR: 2, Role.ADMIN: 3}
if role_hierarchy[new_role_enum] >= role_hierarchy[current_user.role]:
return jsonify({"error": "不能將用戶提升到同等或更高權限"}), 403
# 6. 記錄變更前狀態
old_role = target_user.role
log_security_event("角色變更", current_user.id,
f"用戶 {target_user_id}: {old_role} -> {new_role}")
# 7. 執行變更
db.execute("UPDATE users SET role = %s WHERE id = %s", [new_role, target_user_id])
# 8. 使目標用戶的會話失效(需要重新登入)
invalidate_user_sessions(target_user_id)
# 9. 通知相關人員
notify_user(target_user_id, f"您的權限已被更新為 {new_role}")
notify_admins(f"管理員 {current_user.name} 將用戶 {target_user.name} 權限從 {old_role} 變更為 {new_role}")
return jsonify({"message": "權限已更新,用戶需要重新登入"})
當要求AI寫權限相關功能時,請提醒:
permission_checklist = {
"身份驗證": "用戶是否已登入且會話有效",
"角色授權": "用戶角色是否有此操作權限",
"資源授權": "用戶是否能存取這個特定資源",
"操作授權": "用戶是否能執行這個特定操作"
}
horizontal_access = {
"資源擁有權": "確認資源屬於當前用戶",
"群組權限": "檢查用戶是否在有權限的群組內",
"地理限制": "檢查用戶地理位置是否允許存取",
"時間限制": "檢查當前時間是否在允許存取時段"
}
vertical_access = {
"角色階層": "確認角色等級足夠",
"權限繼承": "檢查是否有間接權限",
"臨時授權": "檢查是否有時效性特殊權限",
"條件權限": "檢查是否滿足特定條件"
}
high_risk_protection = {
"二次驗證": "危險操作需要額外確認",
"審計日誌": "記錄所有權限變更",
"通知機制": "重要操作通知相關人員",
"會話失效": "權限變更後強制重新登入",
"操作限制": "防止自我權限修改"
}
今日金句: "Security is not a product, but a process." — Bruce Schneier
來源: 《Secrets and Lies: Digital Security in a Networked World》(2000)
明日預告: AI醬還在想