iT邦幫忙

2025 iThome 鐵人賽

DAY 25
0
生成式 AI

《AI 新手到職場應用:深度學習 30 天實戰》系列 第 25

實戰:用深度學習做垃圾郵件分類(1/2)

  • 分享至 

  • xImage
  •  

我們每天用電子信箱收信時,總會遇到一些廣告信、詐騙信件,這些就是我們熟悉的「垃圾郵件」。

在過去,郵件服務商多依靠人工規則來判斷,比如是否含有「免費」、「中獎」等關鍵字,
但這樣的方法既僵化又容易被繞過,說不定還會戶略掉一些重要文件。

而隨著自然語言處理(NLP) 與 深度學習 的進步,垃圾郵件分類已經可以透過訓練模型自動完成。
今天,我們就要嘗試打造一個簡單的垃圾郵件分類器,看看文字處理加上神經網路能做到什麼程度。


名詞介紹:

在進入實作之前,我們要先認識幾個關鍵名詞:

Tokenization(斷詞/分詞):

把文字切分成更小的單位,通常是詞或字。

Word Embedding(詞向量):

將文字轉換成連續數字向量,讓模型能捕捉語意關係。

Binary Classification(二元分類):

分類結果只有兩類,例如「垃圾郵件 / 非垃圾郵件」。

Dense Layer(全連接層):

輸出層,負責將前一層的特徵轉換成分類結果。

這些都是在之前的文章內有介紹過的,有興趣的讀者可以再閱讀前面的文章,
這邊就是稍加提及,希望喚起大家的記憶。


實作範例:

我們就話不多說,直接先看完整程式碼:

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import class_weight
import matplotlib.pyplot as plt
import seaborn as sns

# 設定 matplotlib 使用支援英文的字體
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

print("=" * 60)
print("SPAM EMAIL CLASSIFICATION WITH DEEP LEARNING")
print("=" * 60)

# 1. 下載並載入數據集
print("\n[Step 1] Downloading and loading dataset...")
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
!unzip -o smsspamcollection.zip > /dev/null 2>&1

# 讀取數據,使用 tab 分隔
data = pd.read_csv('SMSSpamCollection', sep='\t', header=None, names=['label', 'message'])

print("\nDataset Information:")
print(f"  Total samples: {len(data)}")
print(f"  Spam messages: {sum(data['label'] == 'spam')} ({sum(data['label'] == 'spam')/len(data)*100:.1f}%)")
print(f"  Ham messages: {sum(data['label'] == 'ham')} ({sum(data['label'] == 'ham')/len(data)*100:.1f}%)")
print("\nFirst 5 samples:")
print(data.head())

# 2. 數據預處理
print("\n[Step 2] Preprocessing data...")

# 將標籤轉換為數字:spam=1(垃圾郵件), ham=0(正常郵件)
data['label'] = data['label'].map({'spam': 1, 'ham': 0})

# 分割訓練集和測試集(80% 訓練,20% 測試)
X_train, X_test, y_train, y_test = train_test_split(
    data['message'], 
    data['label'], 
    test_size=0.2, 
    random_state=42,
    stratify=data['label']
)

print(f"  Training set size: {len(X_train)}")
print(f"  Test set size: {len(X_test)}")

# 3. 文本向量化
print("\n[Step 3] Text vectorization...")

# 設定向量化參數
max_features = 5000   # 減少詞彙表大小
sequence_length = 50   # 減少序列長度

# 創建文本向量化層
vectorize_layer = layers.TextVectorization(
    max_tokens=max_features,
    output_mode='int',
    output_sequence_length=sequence_length
)

# 適配詞彙表
vectorize_layer.adapt(X_train.values)

# 將文本轉換為整數序列
X_train_vec = vectorize_layer(X_train.values)
X_test_vec = vectorize_layer(X_test.values)

print(f"  Vectorized shape: {X_train_vec.shape}")
print(f"  Vocabulary size: {len(vectorize_layer.get_vocabulary())}")

# 4. 建立更簡單有效的模型
print("\n[Step 4] Building deep learning model...")

# 使用 Bidirectional LSTM + 更積極的正則化
model = keras.Sequential([
    # Embedding 層
    layers.Embedding(input_dim=max_features, output_dim=64, input_length=sequence_length),
    
    # 雙向 LSTM
    layers.Bidirectional(layers.LSTM(32, return_sequences=False)),
    layers.Dropout(0.5),
    
    # Dense 層
    layers.Dense(24, activation='relu'),
    layers.Dropout(0.5),
    
    # 輸出層
    layers.Dense(1, activation='sigmoid')
])

# 使用更高的學習率
optimizer = keras.optimizers.Adam(learning_rate=0.002)

model.compile(
    optimizer=optimizer,
    loss='binary_crossentropy',
    metrics=['accuracy', 
             tf.keras.metrics.Precision(name='precision'),
             tf.keras.metrics.Recall(name='recall')]
)

print("\nModel Architecture:")
model.summary()

# 5. 訓練模型
print("\n[Step 5] Training model...")

# 計算類別權重
class_weights = class_weight.compute_class_weight(
    'balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weight_dict = {0: class_weights[0], 1: class_weights[1]}
print(f"  Class weights: Ham={class_weight_dict[0]:.2f}, Spam={class_weight_dict[1]:.2f}")

# 添加早停機制
early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)

print("=" * 60)

history = model.fit(
    X_train_vec,
    y_train,
    epochs=20,
    batch_size=64,  # 增加批次大小
    validation_split=0.2,
    class_weight=class_weight_dict,
    callbacks=[early_stopping],
    verbose=1
)

print("=" * 60)
print("Training completed!")

# 6. 評估模型
print("\n[Step 6] Evaluating model on test set...")

test_results = model.evaluate(X_test_vec, y_test, verbose=0)
test_loss = test_results[0]
test_acc = test_results[1]
test_precision = test_results[2]
test_recall = test_results[3]

print("\nTest Set Performance:")
print(f"  Accuracy:  {test_acc:.4f} ({test_acc*100:.2f}%)")
print(f"  Precision: {test_precision:.4f}")
print(f"  Recall:    {test_recall:.4f}")
print(f"  Loss:      {test_loss:.4f}")

# 計算 F1 分數
if test_precision + test_recall > 0:
    f1_score = 2 * (test_precision * test_recall) / (test_precision + test_recall)
    print(f"  F1-Score:  {f1_score:.4f}")
else:
    print(f"  F1-Score:  0.0000")

# 7. 生成預測和分類報告
print("\n[Step 7] Generating predictions and classification report...")

y_pred_prob = model.predict(X_test_vec, verbose=0)
y_pred = (y_pred_prob > 0.5).astype(int).flatten()

print("\nDetailed Classification Report:")
print("-" * 60)
print(classification_report(y_test, y_pred, target_names=['Ham (Normal)', 'Spam (Junk)']))

# 8. 視覺化結果
print("\n[Step 8] Creating visualizations...")

fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Spam Classification Model - Training Results', fontsize=16, fontweight='bold', y=1.00)

# 子圖 1:準確率
axes[0, 0].plot(history.history['accuracy'], label='Training Accuracy', linewidth=2, marker='o')
axes[0, 0].plot(history.history['val_accuracy'], label='Validation Accuracy', linewidth=2, marker='s')
axes[0, 0].set_title('Model Accuracy', fontsize=14, fontweight='bold', pad=10)
axes[0, 0].set_xlabel('Epoch', fontsize=12)
axes[0, 0].set_ylabel('Accuracy', fontsize=12)
axes[0, 0].legend(fontsize=10)
axes[0, 0].grid(True, alpha=0.3)

# 子圖 2:損失
axes[0, 1].plot(history.history['loss'], label='Training Loss', linewidth=2, marker='o')
axes[0, 1].plot(history.history['val_loss'], label='Validation Loss', linewidth=2, marker='s')
axes[0, 1].set_title('Model Loss', fontsize=14, fontweight='bold', pad=10)
axes[0, 1].set_xlabel('Epoch', fontsize=12)
axes[0, 1].set_ylabel('Loss', fontsize=12)
axes[0, 1].legend(fontsize=10)
axes[0, 1].grid(True, alpha=0.3)

# 子圖 3:混淆矩陣
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[1, 0], 
            xticklabels=['Ham', 'Spam'], yticklabels=['Ham', 'Spam'],
            cbar_kws={'label': 'Count'}, annot_kws={'size': 14})
axes[1, 0].set_title('Confusion Matrix', fontsize=14, fontweight='bold', pad=10)
axes[1, 0].set_ylabel('True Label', fontsize=12)
axes[1, 0].set_xlabel('Predicted Label', fontsize=12)

# 添加統計資訊
tn, fp, fn, tp = cm.ravel()
accuracy_text = f'Accuracy: {(tp+tn)/(tp+tn+fp+fn)*100:.2f}%\nTrue Positives: {tp}\nTrue Negatives: {tn}\nFalse Positives: {fp}\nFalse Negatives: {fn}'
axes[1, 0].text(2.5, 0.5, accuracy_text, fontsize=10, ha='left', va='center',
                bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

# 子圖 4:精確率和召回率
axes[1, 1].plot(history.history['precision'], label='Training Precision', linewidth=2, marker='o')
axes[1, 1].plot(history.history['val_precision'], label='Validation Precision', linewidth=2, marker='s')
axes[1, 1].plot(history.history['recall'], label='Training Recall', linewidth=2, marker='^')
axes[1, 1].plot(history.history['val_recall'], label='Validation Recall', linewidth=2, marker='d')
axes[1, 1].set_title('Precision and Recall', fontsize=14, fontweight='bold', pad=10)
axes[1, 1].set_xlabel('Epoch', fontsize=12)
axes[1, 1].set_ylabel('Score', fontsize=12)
axes[1, 1].legend(fontsize=9)
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("Visualizations created successfully!")

# 9. 實際測試範例
print("\n[Step 9] Testing with practical examples...")
print("=" * 60)

def predict_spam(text):
    """預測單一文本是否為垃圾郵件"""
    text_vec = vectorize_layer([text])
    prediction = model.predict(text_vec, verbose=0)[0][0]
    label = "SPAM" if prediction > 0.5 else "HAM"
    confidence = prediction if prediction > 0.5 else (1 - prediction)
    return label, prediction, confidence

# 測試範例
test_messages = [
    "Congratulations! You've won a $1000 gift card. Click here to claim now!",
    "Hey, are we still meeting for lunch tomorrow?",
    "FREE entry to win £1000! Text WIN to 12345",
    "Can you pick up some milk on your way home?",
    "URGENT! Your account will be suspended. Click link to verify",
    "Thanks for the meeting today. Let's catch up next week."
]

print("\nPractical Testing Examples:")
print("-" * 60)

for i, msg in enumerate(test_messages, 1):
    label, prob, confidence = predict_spam(msg)
    
    print(f"\n[Example {i}]")
    print(f"Message: {msg}")
    print(f"Prediction: {label} | Probability: {prob:.4f} | Confidence: {confidence*100:.2f}%")
    
    if label == "SPAM":
        print(f"Status: ⚠️  This message is classified as SPAM")
    else:
        print(f"Status: ✓ This message is classified as NORMAL (HAM)")

print("\n" + "=" * 60)

# 10. 保存模型
print("\n[Step 10] Saving model...")

model.save('spam_classifier_model.keras')
print("Model saved successfully as 'spam_classifier_model.keras'")

vocab = vectorize_layer.get_vocabulary()
with open('vocabulary.txt', 'w', encoding='utf-8') as f:
    for word in vocab:
        f.write(f"{word}\n")
print("Vocabulary saved as 'vocabulary.txt'")

print("\n" + "=" * 60)
print("ALL STEPS COMPLETED SUCCESSFULLY!")
print("=" * 60)
print("\nModel Summary:")
print(f"  - Training samples: {len(X_train)}")
print(f"  - Test samples: {len(X_test)}")
print(f"  - Test accuracy: {test_acc*100:.2f}%")
print(f"  - Test precision: {test_precision:.4f}")
print(f"  - Test recall: {test_recall:.4f}")
print(f"  - Model file: spam_classifier_model.keras")
print(f"  - Vocabulary file: vocabulary.txt")
print("\nYou can now use this model to classify spam messages!")
print("=" * 60)

我也在程式碼中加入了段落註解,接著我們就根據段落一個一個來看,
如果在閱讀中有特別想了解的部分,
也可以根據註解直接拉到該段落的解釋。


以下我們就來進行各段落的程式碼解釋:

第一部分:環境準備與套件匯入

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import class_weight
import matplotlib.pyplot as plt
import seaborn as sns

# 設定 matplotlib 使用支援英文的字體
plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

在這段程式碼中,我們就是進行必要的第一步 - 匯入專案所需的所有函式庫

想像我們如果要做一道菜,第一步當然是把所有材料和工具準備好對吧?
這裡也是一樣的道理。我們需要幾個 Python 的「工具箱」:

首先是 numpypandas,這兩個是處理數據的基本工具,就像廚房裡的菜刀和砧板。
tensorflowkeras 是我們的「深度學習引擎」,負責訓練神經網路。
sklearn 提供一些方便的功能,像是切分數據、計算準確率這些。
最後 matplotlibseaborn 是用來畫圖的,讓我們能看到訓練結果漂不漂亮。

有個小細節很重要:我們特別設定了字體,因為如果不設定,
圖表上的英文可能會變成亂碼或方框,看起來就很醜。


第二部分:數據載入與探索

print("=" * 60)
print("SPAM EMAIL CLASSIFICATION WITH DEEP LEARNING")
print("=" * 60)

print("\n[Step 1] Downloading and loading dataset...")
!wget -q https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip
!unzip -o smsspamcollection.zip > /dev/null 2>&1

# 讀取數據,使用 tab 分隔
data = pd.read_csv('SMSSpamCollection', sep='\t', header=None, names=['label', 'message'])

print("\nDataset Information:")
print(f"  Total samples: {len(data)}")
print(f"  Spam messages: {sum(data['label'] == 'spam')} ({sum(data['label'] == 'spam')/len(data)*100:.1f}%)")
print(f"  Ham messages: {sum(data['label'] == 'ham')} ({sum(data['label'] == 'ham')/len(data)*100:.1f}%)")
print("\nFirst 5 samples:")
print(data.head())

在這段程式中,我們從 UCI Machine Learning Repository 下載
SMS Spam Collection 數據集,裡面包含約 5,574 則簡訊。

而 5,574 則真實的簡訊中,有些是垃圾訊息,有些是正常訊息。
這個數據集來自 UCI 大學的機器學習資料庫,算是學界的標準測試集。

下載完後,我們會先「偷看」一下數據長什麼樣子,
因此我們使用 pandas 讀取 tab 分隔的文件,並指定欄位名稱
結果我們發現一個有趣的現象:
垃圾郵件只佔 13.4%,也就是說大部分都是正常郵件。

不過這個「比例不平衡」的問題會讓模型變懶。
就像考試時,如果你發現 10 題裡面有 9 題答案都是 A,你可能就會全部猜 A,反正也能拿 90 分。
但這樣根本沒學到東西對吧?所以我們等等要用一些技巧來解決這個問題。

參考資料:
https://www.kaggle.com/datasets/uciml/sms-spam-collection-dataset


第三部分:數據預處理

print("\n[Step 2] Preprocessing data...")

# 將標籤轉換為數字:spam=1(垃圾郵件), ham=0(正常郵件)
data['label'] = data['label'].map({'spam': 1, 'ham': 0})

# 分割訓練集和測試集(80% 訓練,20% 測試)
X_train, X_test, y_train, y_test = train_test_split(
    data['message'], 
    data['label'], 
    test_size=0.2, 
    random_state=42,
    stratify=data['label']
)

print(f"  Training set size: {len(X_train)}")
print(f"  Test set size: {len(X_test)}")

而在第三部分 - 資料預處理的程式碼中,我們將
文字標籤 「spam」(垃圾郵件) 和 「ham」(一般郵件) 轉換為數字 1 和 0,
因為神經網路只能處理數值,我們必須把我們閱讀的語言轉換成電腦看的語言。

因為電腦其實很笨,它只會算數學,看不懂「spam」和「ham」這種文字。
所以我們要把它們翻譯成數字:垃圾郵件變成 1,正常郵件變成 0
就像是給每個類別一個代號。

這裡有個技術細節叫 「stratify」,它確保訓練集和測試集裡面垃圾郵件的比例
都是 13.4%。這樣比較公平,不會出現訓練集都是正常郵件、測試集都是垃圾郵件的尷尬情況。

因此我們使用 train_test_split 分割數據為 80% 訓練集和 20% 測試集,
以達到正確訓練的目標,避免類似過擬合的現象。
stratify 參數是確保兩個集合中垃圾郵件的比例都維持在 13.4%,
random_state=42 則保證每次執行結果一致,確保實驗可重現性。


第四部分:文本向量化

print("\n[Step 3] Text vectorization...")

# 設定向量化參數
max_features = 5000   # 減少詞彙表大小
sequence_length = 50   # 減少序列長度

# 創建文本向量化層
vectorize_layer = layers.TextVectorization(
    max_tokens=max_features,
    output_mode='int',
    output_sequence_length=sequence_length
)

# 適配詞彙表
vectorize_layer.adapt(X_train.values)

# 將文本轉換為整數序列
X_train_vec = vectorize_layer(X_train.values)
X_test_vec = vectorize_layer(X_test.values)

print(f"  Vectorized shape: {X_train_vec.shape}")
print(f"  Vocabulary size: {len(vectorize_layer.get_vocabulary())}")

這一步是整個流程中最關鍵的「魔法」。電腦不會讀文字,但它很會處理數字,
所以我們要把每個詞都變成一個數字代號。

首先,程式會掃描所有訓練簡訊,找出最常出現的 5,000 個詞,建立一個「字典」。
比如說「free」可能是編號 245,「money」是編號 567 這樣。

然後把每則簡訊都翻譯成一串數字。比如「Free money now」變成 [245, 567, 892]。
但這裡也有個問題:每則簡訊長度其實不一樣,有的只有 10 個詞,有的有 100 個詞。
所以我們統一規定:
每則簡訊都要是 50 個數字。太短的後面補 0(就像用空白填滿),太長的就砍掉後面
這樣一來,每則簡訊都變成一個「50 個數字的清單」,電腦就知道怎麼處理了。

因此在程式編寫方面,我們使用 TextVectorization 層將文本轉換為整數序列。
max_features=5000 表示只保留最常見的 5,000 個詞,這對短文本(如簡訊)已經足夠。sequence_length=50 將所有文本統一為 50 個詞的長度(如上描述)。
adapt 方法從訓練集學習詞彙表,建立詞到整數的映射關係。


第五部分:建立雙向 LSTM 模型

print("\n[Step 4] Building deep learning model...")

# 使用 Bidirectional LSTM + 更積極的正則化
model = keras.Sequential([
    # Embedding 層
    layers.Embedding(input_dim=max_features, output_dim=64, input_length=sequence_length),
    
    # 雙向 LSTM
    layers.Bidirectional(layers.LSTM(32, return_sequences=False)),
    layers.Dropout(0.5),
    
    # Dense 層
    layers.Dense(24, activation='relu'),
    layers.Dropout(0.5),
    
    # 輸出層
    layers.Dense(1, activation='sigmoid')
])

print("\nModel Architecture:")
model.summary()

接著就是要來建立模型了。
我們用的架構叫做「雙向 LSTM」,聽起來很酷炫,但概念其實不難。

首先是 Embedding 層
它的工作是把每個詞的數字代號變成一個「向量」(想像成每個詞的身分證,上面記錄了這個詞的各種特徵)。
這個向量有 64 個數字,就代表它有 64 種特徵。
在訓練過程中,模型會自己學習,讓意思相近的詞有相似的向量。
比如「free」和「gratis」(免費的意思)的向量就會比較接近。


接著是核心的雙向 LSTM

LSTM 是什麼?

想像你在看一部電影,你需要記住前面的劇情才能理解現在發生什麼事對吧?
LSTM 就是幫電腦「記憶」前面看過的詞。
而「雙向」的意思是,它不只從頭看到尾,還會從尾看到頭。
就像你讀一個句子,有時候要看完整句才知道前面那個詞是什麼意思。


然後是 Dropout 層,這個很有趣。它會隨機關閉 50% 的神經元,這樣聽起來是在搞破壞對吧?
但其實這是在防止模型「太聰明」,也就是防止「過擬合」。
想像一個學生如果太依賴某幾個朋友給答案,考試時這些朋友不在就慘了。
Dropout 就是強迫模型不能只依賴某些神經元,要學會更全面的判斷。

最後就是輸出層,用 sigmoid 函數輸出一個 0 到 1 之間的數字,
代表「是垃圾郵件」的機率。0.9 表示 90% 確定是垃圾郵件,
0.1 表示 90% 確定不是。


今天的內容就先到這邊,明日接續。


上一篇
預訓練模型是什麼?BERT、ResNet介紹
系列文
《AI 新手到職場應用:深度學習 30 天實戰》25
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言