iT邦幫忙

0

Day 20:威脅情報整合與自動更新模組(Threat Intelligence Integration)

o 2025-10-22 00:48:29226 瀏覽
  • 分享至 

  • xImage
  •  

一、今日目標

  1. 了解威脅情報(Threat Intelligence, TI)在防禦中的角色。
  2. 整合外部威脅資料來源(Threat Feeds)。
  3. 實作自動化定期同步模組(Scheduler + Parser)。
  4. 將外部情報與內部模型結果結合,形成「智慧威脅分數(Threat Score)」。
  5. 實作異常警報與黑名單即時更新流程。

二、威脅情報簡介(Threat Intelligence Overview)

類型 說明 常見來源
Open Source TI 免費開放的威脅資料集 PhishTank、AbuseIPDB、AlienVault OTX
Commercial TI 付費或專屬供應商數據 Cisco Talos、CrowdStrike、Recorded Future
Community-based TI 由社群共建,如開源貢獻者提交 MalwareBazaar、OpenPhish

威脅情報的價值:

  • 提供「已知惡意實體(domain/IP/hash)」的清單。
  • 協助模型辨識未知樣本時的風險基準。
  • 讓防禦系統具備「全球視野」,而非僅依據本地資料學習。

三、資料來源整合設計

3.1 Threat Feed 格式(統一結構)

將各來源統一轉換為內部統一格式:

source indicator type confidence last_seen
PhishTank phishing-site.com domain 0.9 2025-10-20
VirusTotal 123.45.67.89 ip 0.85 2025-10-20
AbuseIPDB 98.76.54.32 ip 0.7 2025-10-19

3.2 Feed 來源與 API 範例

(1) PhishTank API

import requests, pandas as pd

url = "https://data.phishtank.com/data/online-valid.csv"
df = pd.read_csv(url)
df_feed = df[['url', 'phish_detail_url', 'submission_time']]
df_feed.columns = ['indicator', 'source_url', 'last_seen']
df_feed["source"] = "PhishTank"
df_feed["type"] = "domain"
df_feed["confidence"] = 0.9
df_feed.to_csv("feeds/phishtank.csv", index=False)
(2) VirusTotal Domain Reputation
python
複製程式碼
import requests, os, pandas as pd

VT_API_KEY = os.getenv("VT_API_KEY")
def vt_lookup(domain):
    url = f"https://www.virustotal.com/api/v3/domains/{domain}"
    headers = {"x-apikey": VT_API_KEY}
    r = requests.get(url, headers=headers)
    data = r.json()
    rep = data["data"]["attributes"]["reputation"]
    last_analysis = data["data"]["attributes"]["last_analysis_stats"]
    confidence = min(rep / 100, 1.0)
    return {"indicator": domain, "type": "domain", "confidence": confidence}

domains = ["bit.ly", "secure-login.net"]
feed = [vt_lookup(d) for d in domains]
pd.DataFrame(feed).to_csv("feeds/virustotal.csv", index=False)
(3) AbuseIPDB IP Reputation
python
複製程式碼
import requests, pandas as pd, os
ABUSE_API = os.getenv("ABUSE_API")

def abuse_lookup(ip):
    r = requests.get("https://api.abuseipdb.com/api/v2/check", 
                     headers={"Key": ABUSE_API, "Accept": "application/json"},
                     params={"ipAddress": ip, "maxAgeInDays": 30})
    data = r.json()["data"]
    confidence = data["abuseConfidenceScore"] / 100
    return {"indicator": ip, "type": "ip", "confidence": confidence}

ips = ["123.45.67.89", "111.222.33.44"]
feed = [abuse_lookup(ip) for ip in ips]
pd.DataFrame(feed).to_csv("feeds/abuseipdb.csv", index=False)
四、威脅資料整合模組(Feed Aggregator)
4.1 整合多來源資料
python
複製程式碼
# threat_feed_aggregator.py
import pandas as pd
from glob import glob
import datetime

files = glob("feeds/*.csv")
dfs = [pd.read_csv(f) for f in files]
merged = pd.concat(dfs).drop_duplicates(subset=["indicator"])

merged["last_update"] = datetime.datetime.utcnow()
merged.to_csv("feeds/threat_master.csv", index=False)
print(f"✅ Aggregated {len(merged)} indicators from {len(files)} feeds.")
4.2 自動化排程
設定每日凌晨更新:

bash
複製程式碼
0 3 * * * /usr/bin/python3 /app/threat_feed_aggregator.py
五、整合威脅分數(Threat Score)計算
將外部威脅分數與內部模型結果融合,形成最終的「智慧威脅分數」。

python
複製程式碼
# threat_score.py
import pandas as pd

model_scores = pd.read_csv("data/model_predictions.csv")   # columns: domain, model_score
threat_feed = pd.read_csv("feeds/threat_master.csv")       # columns: indicator, confidence

merged = model_scores.merge(threat_feed, left_on="domain", right_on="indicator", how="left")
merged["final_score"] = merged.apply(
    lambda x: max(x["model_score"], x["confidence"] or 0) if not pd.isna(x["confidence"]) else x["model_score"], axis=1
)
merged.to_csv("data/threat_combined.csv", index=False)
邏輯:
若外部情報信心高(confidence > 0.7),則即使模型未判定為惡意,也應提高警示權重。

六、異常警報與自動封鎖
6.1 自動封鎖條件
final_score >= 0.85 → 直接封鎖(更新 Blacklist Service)。

0.7 <= final_score < 0.85 → 加入灰名單待人工審核。

< 0.7 → 僅監控,不動作。

python
複製程式碼
# threat_autoblock.py
import pandas as pd, requests

df = pd.read_csv("data/threat_combined.csv")
for _, row in df.iterrows():
    if row["final_score"] >= 0.85:
        payload = {"domain": row["domain"], "reason": "High risk from TI", "source": "TI feed"}
        requests.post("http://127.0.0.1:8500/add", json=payload)
        print(f"Added to blacklist: {row['domain']}")
七、系統安全監控與效能評估
指標	說明	目標值
Feed 更新成功率	每日自動同步成功比例	≥ 95%
Feed 合併時間	多來源聚合耗時	≤ 30 秒
新增黑名單數量	每日偵測到的新惡意實體	≥ 10
誤封率	因外部誤報導致的錯誤封鎖比例	< 2%

八、安全與隱私考量
API 金鑰安全:所有外部服務金鑰應存於 .env 或安全憑證庫(Vault)。

資料去識別化:若外部情報含個資(如回報者 Email),需過濾。

可信來源控制:僅信任官方或驗證過的 Feed,避免惡意注入假情報。

資料時效性:過期(>30 天)的 Feed 須自動剔除,以防過時資訊導致誤判。


圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言