[異常偵測模型]
↓
[黑名單候選產生] → [人工審核介面] → (核准惡意)
↓ ↓
[Blacklist Service API] ←─────────────┘
↓
[更新防火牆/Widget 機制] → [實際阻斷惡意 URL/Domain]
python
複製程式碼
目標:集中管理惡意 domain / URL 的黑名單狀態。
blacklist_service/app.py
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
from datetime import datetime
import json, os
app = FastAPI(title="Blacklist Service")
DATA_FILE = "blacklist.json"
class BlacklistEntry(BaseModel):
domain: str
reason: str
source: str
added_at: datetime = datetime.utcnow()
active: bool = True
def load_data():
if not os.path.exists(DATA_FILE):
return []
with open(DATA_FILE, "r") as f:
return json.load(f)
def save_data(data):
with open(DATA_FILE, "w") as f:
json.dump(data, f, default=str, indent=2)
@app.post("/add")
async def add_blacklist(entry: BlacklistEntry):
data = load_data()
if any(e["domain"] == entry.domain for e in data):
raise HTTPException(status_code=400, detail="Domain already exists")
data.append(entry.dict())
save_data(data)
return {"message": f"Domain {entry.domain} added to blacklist."}
@app.get("/list")
async def list_blacklist():
return load_data()
@app.post("/remove")
async def remove_blacklist(domain: str = Body(..., embed=True)):
data = load_data()
new_data = [e for e in data if e["domain"] != domain]
save_data(new_data)
return {"message": f"Domain {domain} removed from blacklist."}
啟動服務:
bash
複製程式碼
uvicorn blacklist_service.app:app --host 0.0.0.0 --port 8500
3.2 測試新增黑名單
bash
複製程式碼
curl -X POST http://127.0.0.1:8500/add \
-H "Content-Type: application/json" \
-d '{"domain":"bit.ly","reason":"Detected in phishing cluster","source":"model"}'
3.3 Widget 同步更新機制
前端 Widget 在載入時呼叫 /list API 並自動比對黑名單:
javascript
複製程式碼
async function isBlacklisted(domain){
const resp = await fetch("http://127.0.0.1:8500/list");
const data = await resp.json();
return data.some(e => e.domain === domain && e.active);
}
// Example usage in widget
if(await isBlacklisted(domain)){
showModal("此連結屬於封鎖名單,已自動阻擋。", "high-risk");
return;
}
四、自動化封鎖與人工審核整合
4.1 審核面板資料(簡化版本)
將 Day 16 產出的 candidate_blocklist.csv 匯入管理後台,顯示如下欄位:
domain reason cluster_id count 狀態
bit.ly 高度重複短網址出現 3 142 審核中
login-secure.net 攻擊群聚樣本 2 56 待確認
管理者可選擇 ✅ 核准 / ❌ 拒絕。
點擊核准後會自動呼叫 Blacklist API /add。
4.2 審核觸發腳本(範例)
review_submit.py
python
複製程式碼
import requests, csv
API_URL = "http://127.0.0.1:8500/add"
with open("candidate_blocklist.csv") as f:
reader = csv.DictReader(f)
for row in reader:
if row["approved"] == "yes":
payload = {
"domain": row["domain"],
"reason": row["reason"],
"source": "human_review"
}
r = requests.post(API_URL, json=payload)
print(r.json())
4.3 自動防火牆更新(Nginx / ModSecurity)
每 10 分鐘從 Blacklist Service 重新載入名單並寫入規則:
update_firewall.sh
bash
複製程式碼
#!/bin/bash
curl -s http://127.0.0.1:8500/list | jq -r '.[].domain' > /etc/nginx/blacklist_domains.txt
# 產生對應規則
cat /etc/nginx/blacklist_domains.txt | awk '{print "if ($host ~* \""$1"\") { return 403; }"}' > /etc/nginx/conf.d/blacklist.conf
nginx -s reload
echo "$(date): Firewall updated" >> /var/log/blacklist_update.log
設定排程:
bash
複製程式碼
(crontab -l ; echo "*/10 * * * * /usr/bin/bash /app/update_firewall.sh") | crontab -
五、即時防禦回饋機制
5.1 防禦事件上報
每次自動封鎖或攔截事件寫入事件表 blocked_events.log:
csharp
複製程式碼
[2025-10-20 16:35:12] BLOCKED bit.ly by Firewall
[2025-10-20 16:36:55] BLOCKED login-secure.net by Widget
5.2 回饋再訓練流程
每日匯總封鎖事件,將真實封鎖紀錄與人工審核結果合併成新資料,作為模型訓練樣本:
python
複製程式碼
import pandas as pd
df1 = pd.read_csv("blocked_events.csv")
df2 = pd.read_csv("candidate_blocklist.csv")
merged = pd.merge(df1, df2, on="domain", how="left")
merged.to_csv("training_data_update.csv", index=False)
再由 retrain.py 每週自動執行(Day 14 已建立 Cron)。
六、驗證流程與測試
手動提交惡意 domain → Blacklist Service。
Widget 即時更新 → 點擊該網址時應顯示「此連結已封鎖」。
查看 Nginx access log,確認被攔截的請求返回 403 Forbidden。
於 Grafana Dashboard 查看黑名單更新頻率與防火牆命中數。
七、整體安全優化建議
權限與審核分層:審核者、維護者、系統帳號權限分離,防止誤封。
快取同步:Widget 前端可使用本地快取降低頻繁 API 請求。
回滾機制:若封鎖導致誤判,可一鍵從 API /remove 解除。
簽章驗證:未來可使用 JWT 簽章機制驗證防火牆更新請求。