iT邦幫忙

0

Day 16:攻擊行為模式分析與 AI 驅動防禦建議

o 2025-10-20 22:50:02229 瀏覽
  • 分享至 

  • xImage
  •  

一、今日目標

  1. 將系統日誌、API 事件、Gophish 測試結果等整合為分析資料集。
  2. 提取行為特徵(time window、domain pattern、短網址、payload 特徵、點擊序列)。
  3. 使用非監督學習(聚類 / 異常偵測)找出攻擊群聚與樣式。
  4. 產出可轉換為防禦規則或模型的「可執行建議」(例如:自動封鎖短網址、提高特定時間段閾值、建立黑名單候選)。
  5. 設計回饋流程,將人工確認結果回流為訓練資料。

二、資料整合(輸入資料來源與欄位建議)

將以下來源整合成一張主表 events.csv(或儲於資料庫):

  • API 檢測事件:event_id, timestamp, user_id_hash, url, domain, score, verdict, source
  • Gophish campaign 結果:campaign_id, recipient_hash, sent_time, open_time, click_time, landing_page
  • Frontend widget 事件:widget_id, timestamp, url, user_action (blocked/continued/feedback)
  • 補充欄位:ua_string, client_ip_hash, referrer

採用 UTC 時間並對使用者識別資訊(email、IP)做雜湊或遮罩以符隱私原則。


三、特徵工程(Feature Engineering)要點

常用特徵類型:

  1. URL / Domain 特徵

    • 是否短網址(bit.ly 等)、子域數量、域名長度、頂級域(TLD)類別、是否含可疑字串(login, verify, secure)
    • WHOIS/域名註冊年齡(進階,可透過外部查詢)
  2. 時間行為特徵

    • 每日/每小時發送分佈(hour_of_day)、發送速率、突發性(burstiness)指標
    • 點擊延遲(click_time - open_time)統計(平均/中位/分佈)
  3. 行為序列特徵(session-level)

    • 事件序列(open → click → submit)長度、典型序列模式、時間間隔分佈
    • 使用 n-gram 或 Markov chain 表示序列模式
  4. 內容向量化

    • 文本(email body / subject / anchor text)用 TF-IDF 或 CountVectorizer;進階用 sentence embeddings(例如 SBERT)做相似性聚類
  5. 交互特徵

    • 同一 domain 在短時間內被多個 hashed user click 的次數(群聚指標)
    • 在多個 campaign 中重複出現的 landing page 或 URL 模板

四、非監督學習範例(聚類與異常偵測)

下面提供一個用 scikit-learn 的實作流程(可放在 scripts/attack_pattern.py):

# attack_pattern.py
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn.ensemble import IsolationForest
import tldextract
import numpy as np

# 讀取整合後事件
df = pd.read_csv("data/events.csv", parse_dates=["timestamp"])

# 範例特徵:domain, is_short, hour_of_day, text_tfidf
def get_domain(u):
    try:
        te = tldextract.extract(u)
        return ".".join(part for part in [te.domain, te.suffix] if part)
    except:
        return ""

df['domain'] = df['url'].fillna("").apply(get_domain)
df['is_short'] = df['domain'].str.contains('bit.ly|tinyurl|t.co|short', na=False).astype(int)
df['hour'] = df['timestamp'].dt.hour

# 文本向量化(以 url 或 subject 為例)
vec = TfidfVectorizer(max_features=500)
tfidf = vec.fit_transform(df['url'].fillna(""))

# 數值特徵合併
num_feats = df[['is_short','hour']].fillna(0).to_numpy()
from scipy.sparse import hstack
X = hstack([tfidf, num_feats])

# 使用 DBSCAN 做聚類,找群聚攻擊(適合噪聲多的情況)
cl = DBSCAN(eps=0.5, min_samples=5, metric='cosine').fit(X)
df['cluster'] = cl.labels_

# 異常偵測(找孤立攻擊樣本)
iso = IsolationForest(contamination=0.01, random_state=42)
df['anomaly_score'] = iso.fit_predict(X.toarray())  # -1 anomaly, 1 normal

# 匯出結果供分析
df.to_csv("data/events_with_patterns.csv", index=False)
print("clusters:", df['cluster'].value_counts().head(10))
print("anomalies:", (df['anomaly_score'] == -1).sum())
說明與注意事項:

DBSCAN 的 eps 與距離度量需依資料量與向量化方法微調。

TF-IDF 用於 URL/subject 文字相似性;若使用 embeddings,聚類結果通常更好。

IsolationForest 可找出少數異常行為(例如突發大量短網址點擊)。

五、序列模式分析(簡易 Markov / n-gram)
若要找出常見攻擊序列(例如:email_sent -> open -> click -> submit),可把事件按 session(或 hashed user + campaign)排序,計算 transition matrix:

python
複製程式碼
# sequence_analysis.py (簡要)
import pandas as pd
from collections import Counter, defaultdict

df = pd.read_csv("data/events.csv", parse_dates=["timestamp"])
# 假設有 session_id 或以 user_hash + day 作為 session
df['session'] = df['user_id_hash'].astype(str) + "_" + df['timestamp'].dt.date.astype(str)
events = df.groupby('session').apply(lambda g: list(g.sort_values('timestamp')['event_type']))

# 計算 transition
trans = defaultdict(Counter)
for seq in events:
    for a,b in zip(seq, seq[1:]):
        trans[a][b] += 1

# Normalize to probabilities
trans_probs = {k: {kk: vv/sum(vv2.values()) for kk,vv in v.items()} for k,v in trans.items() for vv2 in [v]}
print(trans_probs)
由轉移矩陣可識別高概率的攻擊流程,並將低概率但高風險的轉移視為異常。

六、防禦建議產出(從模式到規則)
根據聚類與序列分析,產生「可執行防禦建議」範例:

自動黑名單候選:

對於被 DBSCAN 判為同一群聚且 cluster size > N 且包含短網址者,自動加入「黑名單候選」由管理員審核;若經人工確認為惡意,自動加入黑名單並同步到前端 widget 與 API。

動態閾值調整:

若在某小時(例如 02:00–04:00)發現短時間大量 click → 暫時降低 medium->high 閾值或提高 rate-limit 觸發。

序列阻斷規則:

若觀測到特定 sequence(open -> click -> submit)在短時間內出現且 domain 為新註冊或短網址,則在 landing page 阻止 submit 或要求二次驗證。

自動通報與封鎖流程:

異常群聚(IsolationForest 判定)自動觸發告警,並將該 domain 與 URL 推入 queue,由審核人員確認 15 分鐘內決議(封鎖 / 標示 / 監控)。

模型回饋:

把人工審核結果(真惡性標註)回寫至 training_data.csv 作為再訓練來源,並週期性 retrain。

七、今日實作任務(步驟化)
匯出最近 14 天的 events.csv(包含 API、Gophish、widget 三類事件)。

依範例執行 attack_pattern.py,輸出 events_with_patterns.csv。檢視 top clusters 與 anomalies。

使用 sequence_analysis.py 計算轉移矩陣,找出高風險序列(列出 top 10)。

將 cluster/anomaly 結果轉成「黑名單候選」清單(candidate_blocklist.csv),格式:domain, reason, cluster_id, example_url, count。

在管理儀表板新增一個「黑名單候選」視圖,供人工審核(可手動匯出 CSV)。

八、評估指標(如何驗證成效)
發現率(Detection Rate):聚類/異常偵測標出後經人工確認為惡意的比例(Precision)。

回收時間(Time-to-Block):從系統自動發現到人工/自動封鎖域名的平均時間。

誤判率(False Positive):被標為候選但人工確認為 benign 的比例。

系統影響度:採取防禦後(例如封鎖 domain)在短期內降低的 click 數與成功欺騙率。

九、注意事項與風險
非監督學習常有誤判,所有自動封鎖務必先經人工審核或設置緩衝期(例如自動加入候選,24 小時後若多個來源仍偵測到則自動封鎖)。

特徵需去識別化以符隱私原則。

聚類參數需以小批量資料反覆微調;建議先在內部環境做 A/B 驗證再放大套用。

圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言