iT邦幫忙

0

Day 17:自動化防禦流程實作(End-to-End Pipeline)

o 2025-10-20 22:56:01297 瀏覽
  • 分享至 

  • xImage
  •  

一、今日目標

  1. 建立自動化封鎖流程(Auto Blocking Pipeline)。
  2. 整合人工審核介面與自動下發規則。
  3. 設計與部署「黑名單服務(Blacklist Service)」API。
  4. 實作自動同步與防火牆更新機制。
  5. 驗證整體防禦流程的穩定性與即時反應能力。

二、Pipeline 架構概覽

2.1 系統流程圖

[異常偵測模型]

[黑名單候選產生] → [人工審核介面] → (核准惡意)
↓ ↓
[Blacklist Service API] ←─────────────┘

[更新防火牆/Widget 機制] → [實際阻斷惡意 URL/Domain]

python
複製程式碼

2.2 流程說明

  • 偵測層(Detection Layer):由 Day 16 的模型產出候選 domain / URL。
  • 審核層(Review Layer):管理者審核候選黑名單,確認後觸發 API。
  • 執行層(Action Layer):Blacklist API 接受請求並更新防火牆、Widget、資料庫封鎖表。
  • 回饋層(Feedback Layer):記錄執行結果並回饋至模型再訓練資料集。

三、黑名單服務(Blacklist Service)

3.1 後端 API 設計

目標:集中管理惡意 domain / URL 的黑名單狀態。

blacklist_service/app.py

from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel
from datetime import datetime
import json, os

app = FastAPI(title="Blacklist Service")

DATA_FILE = "blacklist.json"

class BlacklistEntry(BaseModel):
    domain: str
    reason: str
    source: str
    added_at: datetime = datetime.utcnow()
    active: bool = True

def load_data():
    if not os.path.exists(DATA_FILE):
        return []
    with open(DATA_FILE, "r") as f:
        return json.load(f)

def save_data(data):
    with open(DATA_FILE, "w") as f:
        json.dump(data, f, default=str, indent=2)

@app.post("/add")
async def add_blacklist(entry: BlacklistEntry):
    data = load_data()
    if any(e["domain"] == entry.domain for e in data):
        raise HTTPException(status_code=400, detail="Domain already exists")
    data.append(entry.dict())
    save_data(data)
    return {"message": f"Domain {entry.domain} added to blacklist."}

@app.get("/list")
async def list_blacklist():
    return load_data()

@app.post("/remove")
async def remove_blacklist(domain: str = Body(..., embed=True)):
    data = load_data()
    new_data = [e for e in data if e["domain"] != domain]
    save_data(new_data)
    return {"message": f"Domain {domain} removed from blacklist."}
啟動服務:

bash
複製程式碼
uvicorn blacklist_service.app:app --host 0.0.0.0 --port 8500
3.2 測試新增黑名單
bash
複製程式碼
curl -X POST http://127.0.0.1:8500/add \
-H "Content-Type: application/json" \
-d '{"domain":"bit.ly","reason":"Detected in phishing cluster","source":"model"}'
3.3 Widget 同步更新機制
前端 Widget 在載入時呼叫 /list API 並自動比對黑名單:

javascript
複製程式碼
async function isBlacklisted(domain){
  const resp = await fetch("http://127.0.0.1:8500/list");
  const data = await resp.json();
  return data.some(e => e.domain === domain && e.active);
}

// Example usage in widget
if(await isBlacklisted(domain)){
  showModal("此連結屬於封鎖名單,已自動阻擋。", "high-risk");
  return;
}
四、自動化封鎖與人工審核整合
4.1 審核面板資料(簡化版本)
將 Day 16 產出的 candidate_blocklist.csv 匯入管理後台,顯示如下欄位:

domain	reason	cluster_id	count	狀態
bit.ly	高度重複短網址出現	3	142	審核中
login-secure.net	攻擊群聚樣本	2	56	待確認

管理者可選擇 ✅ 核准 / ❌ 拒絕。
點擊核准後會自動呼叫 Blacklist API /add。

4.2 審核觸發腳本(範例)
review_submit.py

python
複製程式碼
import requests, csv

API_URL = "http://127.0.0.1:8500/add"

with open("candidate_blocklist.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:
        if row["approved"] == "yes":
            payload = {
                "domain": row["domain"],
                "reason": row["reason"],
                "source": "human_review"
            }
            r = requests.post(API_URL, json=payload)
            print(r.json())
4.3 自動防火牆更新(Nginx / ModSecurity)
每 10 分鐘從 Blacklist Service 重新載入名單並寫入規則:

update_firewall.sh

bash
複製程式碼
#!/bin/bash
curl -s http://127.0.0.1:8500/list | jq -r '.[].domain' > /etc/nginx/blacklist_domains.txt

# 產生對應規則
cat /etc/nginx/blacklist_domains.txt | awk '{print "if ($host ~* \""$1"\") { return 403; }"}' > /etc/nginx/conf.d/blacklist.conf

nginx -s reload
echo "$(date): Firewall updated" >> /var/log/blacklist_update.log
設定排程:

bash
複製程式碼
(crontab -l ; echo "*/10 * * * * /usr/bin/bash /app/update_firewall.sh") | crontab -
五、即時防禦回饋機制
5.1 防禦事件上報
每次自動封鎖或攔截事件寫入事件表 blocked_events.log:

csharp
複製程式碼
[2025-10-20 16:35:12] BLOCKED bit.ly by Firewall
[2025-10-20 16:36:55] BLOCKED login-secure.net by Widget
5.2 回饋再訓練流程
每日匯總封鎖事件,將真實封鎖紀錄與人工審核結果合併成新資料,作為模型訓練樣本:

python
複製程式碼
import pandas as pd
df1 = pd.read_csv("blocked_events.csv")
df2 = pd.read_csv("candidate_blocklist.csv")
merged = pd.merge(df1, df2, on="domain", how="left")
merged.to_csv("training_data_update.csv", index=False)
再由 retrain.py 每週自動執行(Day 14 已建立 Cron)。

六、驗證流程與測試
手動提交惡意 domain → Blacklist Service。

Widget 即時更新 → 點擊該網址時應顯示「此連結已封鎖」。

查看 Nginx access log,確認被攔截的請求返回 403 Forbidden。

於 Grafana Dashboard 查看黑名單更新頻率與防火牆命中數。

七、整體安全優化建議
權限與審核分層:審核者、維護者、系統帳號權限分離,防止誤封。

快取同步:Widget 前端可使用本地快取降低頻繁 API 請求。

回滾機制:若封鎖導致誤判,可一鍵從 API /remove 解除。

簽章驗證:未來可使用 JWT 簽章機制驗證防火牆更新請求。


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言