iT邦幫忙

2021 iThome 鐵人賽

DAY 26
0
自我挑戰組

30天初步了解自然語言處理-自學筆記系列 第 26

[Day26] NLP會用到的模型(九)-實作transformer-上

一. 資料準備

這次任務是實作機器翻譯,資料: http://www.manythings.org/anki/ 可以找中翻英的data,可以找cmn-eng/cmn.txt這個簡體字的資料~這個實作是之前上過'NLP 100天馬拉松'的code做一些修改~

二. 整理資料部分

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import csv
import numpy as np
import re
import random
import math
import time
import spacy
from torchtext.data import Field, BucketIterator, TabularDataset
  • 資料長相:
data_dir = 'your_path/'
lines = open(data_dir + 'cmn.txt' , encoding='utf-8').read().strip().split('\n')
trnslt_pairs = [[s for s in l.split('\t')] for l in lines ]
print ("Sample: " , trnslt_pairs[1000][0:2] )
print ("Total records:" , len(trnslt_pairs))

# Sample:  ['He was drowned.', '他被淹死了。']
# Total records: 24360
  • 分train, test與validation data,把'我','你','他','她'開頭的句字存起來,不然data太多,存成csv檔,再由TabularDataset處理csv
# create train and validation set
trnslt_pairs = [pair for pair in trnslt_pairs if pair[1][0] in ['我','你','他','她']]
print (f"Total records: {len(trnslt_pairs)}")
train, test = train_test_split(trnslt_pairs, test_size=0.09)
train, val = train_test_split(train, test_size=0.08)
print (f"training data:{len(train)} , develop data: {len(val)} , testing data: {len(test)}")


def write_csv(trn_data, file_path):
    with open(file_path ,'w', newline='', encoding='utf-8') as fout:
        writer = csv.writer (fout)
        for itm in trn_data: 
            writer.writerow ([itm[0],itm[1]])
            
file_path = data_dir + 'train.csv'
write_csv(train, file_path )

file_path = data_dir + 'val.csv'
write_csv(val, file_path )
    
file_path = data_dir + 'test.csv'
write_csv(test, file_path )
  • 下載 spacy 的英文模型 幫我們做tokenize
spacy_eng = spacy.load('en_core_web_sm')

def tokensize_for_en(text):
    text = re.sub(r"([.!?])", r" \1", text)
    return [tok.text for tok in spacy_eng.tokenizer(text)]

def tokensize_for_ch(text):
    #去掉非中文字元
    regex = re.compile(r'[^\u4e00-\u9fa5A-Za-z0-9]')
    text = regex.sub(' ', text)

    return [word for word in text if word.strip()]
    
target_en = Field(tokenize = tokensize_for_en, 
                  init_token = '<sos>', eos_token = '<eos>', 
                  lower = True, batch_first = True)

source_ch = Field(tokenize = tokensize_for_ch, 
                  init_token = '<sos>', eos_token = '<eos>', 
                  lower = True, batch_first = True)
  • 用TabularDataset來處理csv資料,將出現一次以上的詞保留下來
train_dataset, dev_dataset, test_dataset = TabularDataset.splits(
    path = data_dir , format = 'csv', skip_header = True,
    train='train.csv', validation='val.csv', test='test.csv',
    fields=[
        ('trg', target_en),
        ('src', source_ch)
    ]
)
source_ch.build_vocab(train_dataset, min_freq = 1) #min_freq可以自己調整
target_en.build_vocab(train_dataset, min_freq = 1)

print ("中文語料的字元表長度: " , len(source_ch.vocab) , ", 英文的字元表長度: " ,len(target_en.vocab))
print ("Sample SRC:", test_dataset[0].src , "TRG:", test_dataset[0].trg)
  • 用BucketIterator整理成batch訓練的形式
BATCH_SIZE = 128

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
                                (train_dataset, dev_dataset, test_dataset), 
                                 batch_size = BATCH_SIZE,
                                 sort_within_batch = True,
                                 sort_key = lambda x : len(x.src),
                                 device = device)

三. encoder

  • 定義Transformer encoder的部分,照paper這個encoder會疊6次
class TransformerEncoder(nn.Module):
    def __init__(self, hidden_dim, feedforward_dim, n_enc_layers, 
                 n_attn_heads, dropout, src_voca_length, max_pos_length , 
                 device):
        """
        hidden_dim: embedding size
        feedforward_dim: feedforward 維度
        n_enc_layers: 幾層encoder layers
        n_attn_heads: 分成幾個attention
        dropout: dropout
        src_voca_length: 輸入的字典大小(此處為中文字典)
        max_pos_length: 設定的最大長度(做position embedding用)
        """
        super().__init__()
        self.device = device
        
        # 字元 embedding 
        self.src_tok_embedding = nn.Embedding(src_voca_length , hidden_dim) 
        
        # position embedding 
        self.src_pos_embedding = nn.Embedding(max_pos_length, hidden_dim)  
        
        # 建立 n_enc_layers 層的 Transformer Encoder 層
        self.transformer_encoder_layers = nn.ModuleList([TransformerEncoderLayer(
                                            hidden_dim, 
                                            feedforward_dim,
                                            n_enc_layers,
                                            n_attn_heads, 
                                            dropout, 
                                            device) for _ in range(n_enc_layers)])

        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src_sentence, src_mask):
        """
        src_sentence: [batch_size, src_len]
        src_mask: [batch_size, src_len]
        """
        
        batch_size = src_sentence.shape[0]
        src_len = src_sentence.shape[1]
        
        # 產生 position embedding數列 
        # [batch_size, src_len]
        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
        
        # 將 token embedding 和 position embedding 相加
        # src_sentence [batch_size, src_len, hid_dim]
        src_sentence = self.dropout(self.src_tok_embedding(src_sentence) + self.src_pos_embedding(pos))

        # 將 src_sentence 輸入 n_enc_layers 層的 transformer encoder layers 
        for layer in self.transformer_encoder_layers:
            encoder_hidden, encoder_self_attention = layer(src_sentence, src_mask)
            
        # 輸出最後一層的 hidden layer and encoder self attention    
        # encoder_hidden [batch_size, src_len, hid_dim]
        # encoder_self_attention [batch_size , attention_heads, src_len, src_len]

        return encoder_hidden , encoder_self_attention
  • 將整個encoder疊起來,包含多頭self-attention與全連結層:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, hidden_dim , feedforward_dim, n_enc_layers, n_attn_heads, dropout , device):
        """
        hidden_dim: embedding size
        feedforward_dim: feedforward 維度
        """
        super().__init__()
        
        # 建立 Multi Head self Attention
        self.self_attention_sublayer = MultiHeadAttentionSubLayer(hidden_dim, n_attn_heads, dropout, device)
        
        # layer norm
        self.self_attn_layernorm = nn.LayerNorm(hidden_dim)

        # 建立 Position Wise Feedforward
        self.feedforward_sublayer = PosFeedForwardSubLayer(hidden_dim,feedforward_dim,dropout)
        
        # layer norm
        self.feedforward_layernorm = nn.LayerNorm(hidden_dim)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, src_embedding, src_mask):
        """
        src_embedding: [batch_size, src_len, hid_dim]
        src_mask: [batch_size, src_len]
        """

        # 將 K Q V 計算 attention
        _src,  encoder_self_attention = self.self_attention_sublayer(src_embedding, src_embedding, src_embedding, src_mask)

        # dropout, residual 殘差 connection and layer norm
        # src_embedding [batch_size, src_len, hid_dim]
        src_embedding = self.self_attn_layernorm(src_embedding + self.dropout(_src))

        # positionwise feedforward
        _src = self.feedforward_sublayer(src_embedding)

        #dropout, residual 殘差 and layer norm
        src_embedding = self.feedforward_layernorm(src_embedding + self.dropout(_src))

        # 輸出 src_sentence hidden layer 和 encoder_self_attention
        # src_embedding [batch_size, src_len, hid_dim]
        # encoder_self_attention [batch_size, attension_heads, src_len, src_len]
        
        return src_embedding , encoder_self_attention
  • 多頭self-attention:
class MultiHeadAttentionSubLayer(nn.Module):
    def __init__(self, hidden_dim , n_attn_heads, dropout, device):
        """
        hidden_dim: embedding size
        n_attn_heads: 分成幾個attention
        dropout: dropout
        """
        super().__init__()

        # 確定 設定的 hidden layer 維度可以被 attention head 整除
        assert hidden_dim % n_attn_heads ==0

        # hidden layer 維度
        self.hidden_dim = hidden_dim
        
        # multi-heads 的個數
        self.n_attn_heads = n_attn_heads
        
        # 平均分到每個 multi-head 的 維度
        self.head_dim = hidden_dim // n_attn_heads
        
        # 定義 Wq Wk Wv
        self.full_conn_q = nn.Linear(hidden_dim, hidden_dim)
        self.full_conn_k = nn.Linear(hidden_dim, hidden_dim)
        self.full_conn_v = nn.Linear(hidden_dim, hidden_dim)


        # 最後一層 線性轉換
        self.full_conn_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout)
        
        # 根據維度大小調整 attention 值 以免維度太大 Q dot K 結果過大影響學習效率    
        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, query_input, key_input, value_input, mask = None):
        """
        query_input: q [batch_size, q_len, hid_dim]
        key_input: q [batch_size, k_len, hid_dim]
        value_input: q [batch_size, v_len, hid_dim]
        """
        batch_size = query_input.shape[0]
        
        # 定義 WQ*q -> Q WK*k -> K WV*v -> V
        # Q [batch size, query len, hid dim]
        # K [batch size, key len, hid dim]
        # V [batch size, value len, hid dim]
        Q = self.full_conn_q(query_input)
        K = self.full_conn_k(key_input)
        V = self.full_conn_v(value_input)

        # 將 attention 分成多個 attention
        def split_attention(Q, K, V, num_atn_head, head_dim, batch_size):
            Q = Q.view(batch_size, -1, num_atn_head, head_dim)
            K = K.view(batch_size, -1, num_atn_head, head_dim)
            V = V.view(batch_size, -1, num_atn_head, head_dim)
            return Q, K, V

        # 將 attention 的 2 和 3 維度轉置 以達到將 attention head 提到前面 而分開每個 attention head
        def seperate_heads(Q, K, V):
            Q = Q.permute(0, 2, 1, 3) # (batch_size, num_atn_head, query_len, head_dim)
            K = K.permute(0, 2, 1, 3) # (batch_size, num_atn_head, key_len, head_dim)
            V = V.permute(0, 2, 1, 3) # (batch_size, num_atn_head, value_len, head_dim)
            return Q , K , V


        Q, K, V = split_attention(Q, K, V, self.n_attn_heads, self.head_dim, batch_size)
        Q, K, V = seperate_heads (Q, K, V)

        # 將K的最後兩個維度轉置做 Q * K 除以 scale
        # scaled_dot_product_similarity [batch_size, n_heads, query_len, key_len]
        scaled_dot_product_similarity = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

        # 做 mask
        if mask is not None:
            scaled_dot_product_similarity = scaled_dot_product_similarity.masked_fill(mask == 0, -1e10)
        
        # 隊最後一維做 softmax attention [batch_size, n_heads, query_len, key_len]
        attention = torch.softmax(scaled_dot_product_similarity, dim = -1)
        
        # 最後與 V 相乘
        # x [batch_size, n_heads, query_len, head_dim]
        x = torch.matmul(self.dropout(attention), V)
        
        # 轉換維度 準備將attention合併 x [batch_size, query_len, n_heads, head_dim]
        x = x.permute(0, 2, 1, 3).contiguous()
        #x [batch_size, query_len, hid_dim]
        x = x.view(batch_size, -1, self.hidden_dim)
        
        # 執行最後一層 x [batch_size, query_len, hid_dim]
        x = self.full_conn_o(x)
        
        return x, attention
  • 全連結層:
class PosFeedForwardSubLayer(nn.Module):
    def __init__(self, hidden_dim, ff_dim, dropout):
        super().__init__()
        self.full_conn_1 = nn.Linear(hidden_dim, ff_dim)
        
        self.full_conn_2 = nn.Linear(ff_dim,  hidden_dim)

        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        # x [batch_size, seq_len, ff_dim]
        x = self.dropout(torch.relu(self.full_conn_1(x)))
        
        # x [batch_size, seq_len, hid_dim]
        x = self.full_conn_2(x)
        
        return x

今天先說到建立encoder這邊~因為code真的滿長的~我分2天來寫


上一篇
[Day25] NLP會用到的模型(八)-transformer decoder
下一篇
[Day27] NLP會用到的模型(十)-實作transformer-下
系列文
30天初步了解自然語言處理-自學筆記30

尚未有邦友留言

立即登入留言