將以下來源整合成一張主表 events.csv(或儲於資料庫):
event_id, timestamp, user_id_hash, url, domain, score, verdict, source
campaign_id, recipient_hash, sent_time, open_time, click_time, landing_page
widget_id, timestamp, url, user_action (blocked/continued/feedback)
ua_string, client_ip_hash, referrer
採用 UTC 時間並對使用者識別資訊(email、IP)做雜湊或遮罩以符隱私原則。
常用特徵類型:
URL / Domain 特徵
時間行為特徵
行為序列特徵(session-level)
內容向量化
交互特徵
下面提供一個用 scikit-learn 的實作流程(可放在 scripts/attack_pattern.py):
# attack_pattern.py
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn.ensemble import IsolationForest
import tldextract
import numpy as np
# 讀取整合後事件
df = pd.read_csv("data/events.csv", parse_dates=["timestamp"])
# 範例特徵:domain, is_short, hour_of_day, text_tfidf
def get_domain(u):
    try:
        te = tldextract.extract(u)
        return ".".join(part for part in [te.domain, te.suffix] if part)
    except:
        return ""
df['domain'] = df['url'].fillna("").apply(get_domain)
df['is_short'] = df['domain'].str.contains('bit.ly|tinyurl|t.co|short', na=False).astype(int)
df['hour'] = df['timestamp'].dt.hour
# 文本向量化(以 url 或 subject 為例)
vec = TfidfVectorizer(max_features=500)
tfidf = vec.fit_transform(df['url'].fillna(""))
# 數值特徵合併
num_feats = df[['is_short','hour']].fillna(0).to_numpy()
from scipy.sparse import hstack
X = hstack([tfidf, num_feats])
# 使用 DBSCAN 做聚類,找群聚攻擊(適合噪聲多的情況)
cl = DBSCAN(eps=0.5, min_samples=5, metric='cosine').fit(X)
df['cluster'] = cl.labels_
# 異常偵測(找孤立攻擊樣本)
iso = IsolationForest(contamination=0.01, random_state=42)
df['anomaly_score'] = iso.fit_predict(X.toarray())  # -1 anomaly, 1 normal
# 匯出結果供分析
df.to_csv("data/events_with_patterns.csv", index=False)
print("clusters:", df['cluster'].value_counts().head(10))
print("anomalies:", (df['anomaly_score'] == -1).sum())
說明與注意事項:
DBSCAN 的 eps 與距離度量需依資料量與向量化方法微調。
TF-IDF 用於 URL/subject 文字相似性;若使用 embeddings,聚類結果通常更好。
IsolationForest 可找出少數異常行為(例如突發大量短網址點擊)。
五、序列模式分析(簡易 Markov / n-gram)
若要找出常見攻擊序列(例如:email_sent -> open -> click -> submit),可把事件按 session(或 hashed user + campaign)排序,計算 transition matrix:
python
複製程式碼
# sequence_analysis.py (簡要)
import pandas as pd
from collections import Counter, defaultdict
df = pd.read_csv("data/events.csv", parse_dates=["timestamp"])
# 假設有 session_id 或以 user_hash + day 作為 session
df['session'] = df['user_id_hash'].astype(str) + "_" + df['timestamp'].dt.date.astype(str)
events = df.groupby('session').apply(lambda g: list(g.sort_values('timestamp')['event_type']))
# 計算 transition
trans = defaultdict(Counter)
for seq in events:
    for a,b in zip(seq, seq[1:]):
        trans[a][b] += 1
# Normalize to probabilities
trans_probs = {k: {kk: vv/sum(vv2.values()) for kk,vv in v.items()} for k,v in trans.items() for vv2 in [v]}
print(trans_probs)
由轉移矩陣可識別高概率的攻擊流程,並將低概率但高風險的轉移視為異常。
六、防禦建議產出(從模式到規則)
根據聚類與序列分析,產生「可執行防禦建議」範例:
自動黑名單候選:
對於被 DBSCAN 判為同一群聚且 cluster size > N 且包含短網址者,自動加入「黑名單候選」由管理員審核;若經人工確認為惡意,自動加入黑名單並同步到前端 widget 與 API。
動態閾值調整:
若在某小時(例如 02:00–04:00)發現短時間大量 click → 暫時降低 medium->high 閾值或提高 rate-limit 觸發。
序列阻斷規則:
若觀測到特定 sequence(open -> click -> submit)在短時間內出現且 domain 為新註冊或短網址,則在 landing page 阻止 submit 或要求二次驗證。
自動通報與封鎖流程:
異常群聚(IsolationForest 判定)自動觸發告警,並將該 domain 與 URL 推入 queue,由審核人員確認 15 分鐘內決議(封鎖 / 標示 / 監控)。
模型回饋:
把人工審核結果(真惡性標註)回寫至 training_data.csv 作為再訓練來源,並週期性 retrain。
七、今日實作任務(步驟化)
匯出最近 14 天的 events.csv(包含 API、Gophish、widget 三類事件)。
依範例執行 attack_pattern.py,輸出 events_with_patterns.csv。檢視 top clusters 與 anomalies。
使用 sequence_analysis.py 計算轉移矩陣,找出高風險序列(列出 top 10)。
將 cluster/anomaly 結果轉成「黑名單候選」清單(candidate_blocklist.csv),格式:domain, reason, cluster_id, example_url, count。
在管理儀表板新增一個「黑名單候選」視圖,供人工審核(可手動匯出 CSV)。
八、評估指標(如何驗證成效)
發現率(Detection Rate):聚類/異常偵測標出後經人工確認為惡意的比例(Precision)。
回收時間(Time-to-Block):從系統自動發現到人工/自動封鎖域名的平均時間。
誤判率(False Positive):被標為候選但人工確認為 benign 的比例。
系統影響度:採取防禦後(例如封鎖 domain)在短期內降低的 click 數與成功欺騙率。
九、注意事項與風險
非監督學習常有誤判,所有自動封鎖務必先經人工審核或設置緩衝期(例如自動加入候選,24 小時後若多個來源仍偵測到則自動封鎖)。
特徵需去識別化以符隱私原則。
聚類參數需以小批量資料反覆微調;建議先在內部環境做 A/B 驗證再放大套用。