一、今日目標
延續前一日的測試環境,本篇將建立一個「自動化偵測流程」,將 Day3 的樣本與 Day4 的 Gophish 測試結果結合,產生可量化的風險分數(risk score),並透過 FastAPI 對外提供查詢服務。
完成後,你將擁有:
一個可自動根據樣本內容與 URL 特徵打分的腳本。
一個提供「釣魚風險評分」的 API 端點。
一份可以日後延伸為機器學習模型的結構化資料。
二、資料結構回顧與準備
先確定你已有下列檔案(由前幾日產出):
data/phishing_samples_annotate.csv        # 樣本 + 人工標註
data/phishing_samples_scored.csv          # rule-based 偵測分數
data/results_anonymized.csv               # Gophish 匯出結果(去識別化)
目標是將這三個資料來源整合為一個主資料集(master dataset),欄位包含:
原始文字內容(raw_text)
URL 與 domain
自動分數(score)
開信行為(open_rate / click_flag)
標註結果(label)
三、建立資料整合與打分腳本
scripts/integrate_and_score.py
import pandas as pd
import re
import tldextract
import os
DATA_DIR = os.path.join(os.path.dirname(file), '..', 'data')
SUSPICIOUS_KEYWORDS = [
"verify", "login", "suspend", "account", "limited",
"urgent", "immediately", "asap", "立即", "限時", "帳號", "驗證"
]
SHORT_URLS = ["bit.ly", "tinyurl", "t.co", "shorturl"]
def load_data():
samples = pd.read_csv(os.path.join(DATA_DIR, "phishing_samples_annotate.csv"))
results = pd.read_csv(os.path.join(DATA_DIR, "results_anonymized.csv"))
return samples, results
def score_text(text):
score = 0
if any(k in text.lower() for k in SUSPICIOUS_KEYWORDS):
score += 2
urls = re.findall(r'https?://[^\s]+', text)
for u in urls:
te = tldextract.extract(u)
domain = ".".join([te.domain, te.suffix])
if any(s in domain for s in SHORT_URLS):
score += 3
if "@" in text or "http" in text:
score += 1
return score
def integrate_and_score(samples, results):
# 假設 results 中包含 'email_hash', 'clicked', 'opened'
merged = samples.copy()
merged["score"] = merged["raw_text"].fillna("").apply(score_text)
merged = merged.merge(results, how="left", left_on="id", right_on="id")
merged["click_flag"] = merged.get("clicked", 0)
merged["open_flag"] = merged.get("opened", 0)
merged["predicted_label"] = merged["score"].apply(lambda x: "phishing" if x >= 4 else "benign")
return merged
def main():
samples, results = load_data()
df = integrate_and_score(samples, results)
out_path = os.path.join(DATA_DIR, "phishing_master_dataset.csv")
df.to_csv(out_path, index=False, encoding="utf-8-sig")
print(f"整合完成,共 {len(df)} 筆樣本,輸出檔案:{out_path}")
if name == "main":
main()
執行:
python scripts/integrate_and_score.py
輸出:
data/phishing_master_dataset.csv
(包含文字、分數、實驗行為與預測結果)
四、分析與驗證
以簡單統計檢查模型的準確度與行為趨勢。
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix
df = pd.read_csv("data/phishing_master_dataset.csv")
y_true = df["label"].fillna("benign").apply(lambda x: 1 if x == "phishing" else 0)
y_pred = df["predicted_label"].apply(lambda x: 1 if x == "phishing" else 0)
print(confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred))
print(df.groupby("predicted_label")[["score", "click_flag"]].mean())
檢查報告結果,若 precision 與 recall 均在 0.8 以上,代表規則分數化邏輯可作為初步偵測基線。