DAY 26
0

## [Day26] NLP會用到的模型(九)-實作transformer-上

``````import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import csv
import numpy as np
import re
import random
import math
import time
import spacy
from torchtext.data import Field, BucketIterator, TabularDataset
``````
• 資料長相:
``````data_dir = 'your_path/'
lines = open(data_dir + 'cmn.txt' , encoding='utf-8').read().strip().split('\n')
trnslt_pairs = [[s for s in l.split('\t')] for l in lines ]
print ("Sample: " , trnslt_pairs[1000][0:2] )
print ("Total records:" , len(trnslt_pairs))

# Sample:  ['He was drowned.', '他被淹死了。']
# Total records: 24360
``````
• 分train, test與validation data，把'我','你','他','她'開頭的句字存起來，不然data太多，存成csv檔，再由TabularDataset處理csv
``````# create train and validation set
trnslt_pairs = [pair for pair in trnslt_pairs if pair[1][0] in ['我','你','他','她']]
print (f"Total records: {len(trnslt_pairs)}")
train, test = train_test_split(trnslt_pairs, test_size=0.09)
train, val = train_test_split(train, test_size=0.08)
print (f"training data:{len(train)} , develop data: {len(val)} , testing data: {len(test)}")

def write_csv(trn_data, file_path):
with open(file_path ,'w', newline='', encoding='utf-8') as fout:
writer = csv.writer (fout)
for itm in trn_data:
writer.writerow ([itm[0],itm[1]])

file_path = data_dir + 'train.csv'
write_csv(train, file_path )

file_path = data_dir + 'val.csv'
write_csv(val, file_path )

file_path = data_dir + 'test.csv'
write_csv(test, file_path )
``````
• 下載 spacy 的英文模型 幫我們做tokenize
``````spacy_eng = spacy.load('en_core_web_sm')

def tokensize_for_en(text):
text = re.sub(r"([.!?])", r" \1", text)
return [tok.text for tok in spacy_eng.tokenizer(text)]

def tokensize_for_ch(text):
#去掉非中文字元
regex = re.compile(r'[^\u4e00-\u9fa5A-Za-z0-9]')
text = regex.sub(' ', text)

return [word for word in text if word.strip()]

target_en = Field(tokenize = tokensize_for_en,
init_token = '<sos>', eos_token = '<eos>',
lower = True, batch_first = True)

source_ch = Field(tokenize = tokensize_for_ch,
init_token = '<sos>', eos_token = '<eos>',
lower = True, batch_first = True)
``````
• 用TabularDataset來處理csv資料，將出現一次以上的詞保留下來
``````train_dataset, dev_dataset, test_dataset = TabularDataset.splits(
path = data_dir , format = 'csv', skip_header = True,
train='train.csv', validation='val.csv', test='test.csv',
fields=[
('trg', target_en),
('src', source_ch)
]
)
source_ch.build_vocab(train_dataset, min_freq = 1) #min_freq可以自己調整
target_en.build_vocab(train_dataset, min_freq = 1)

print ("中文語料的字元表長度: " , len(source_ch.vocab) , ", 英文的字元表長度: " ,len(target_en.vocab))
print ("Sample SRC:", test_dataset[0].src , "TRG:", test_dataset[0].trg)
``````
• 用BucketIterator整理成batch訓練的形式
``````BATCH_SIZE = 128

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_dataset, dev_dataset, test_dataset),
batch_size = BATCH_SIZE,
sort_within_batch = True,
sort_key = lambda x : len(x.src),
device = device)
``````

• 定義Transformer encoder的部分，照paper這個encoder會疊6次
``````class TransformerEncoder(nn.Module):
def __init__(self, hidden_dim, feedforward_dim, n_enc_layers,
device):
"""
hidden_dim: embedding size
feedforward_dim: feedforward 維度
n_enc_layers: 幾層encoder layers
dropout: dropout
src_voca_length: 輸入的字典大小(此處為中文字典)
max_pos_length: 設定的最大長度(做position embedding用)
"""
super().__init__()
self.device = device

# 字元 embedding
self.src_tok_embedding = nn.Embedding(src_voca_length , hidden_dim)

# position embedding
self.src_pos_embedding = nn.Embedding(max_pos_length, hidden_dim)

# 建立 n_enc_layers 層的 Transformer Encoder 層
self.transformer_encoder_layers = nn.ModuleList([TransformerEncoderLayer(
hidden_dim,
feedforward_dim,
n_enc_layers,
dropout,
device) for _ in range(n_enc_layers)])

self.dropout = nn.Dropout(dropout)

"""
src_sentence: [batch_size, src_len]
"""

batch_size = src_sentence.shape[0]
src_len = src_sentence.shape[1]

# 產生 position embedding數列
# [batch_size, src_len]
pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)

# 將 token embedding 和 position embedding 相加
# src_sentence [batch_size, src_len, hid_dim]
src_sentence = self.dropout(self.src_tok_embedding(src_sentence) + self.src_pos_embedding(pos))

# 將 src_sentence 輸入 n_enc_layers 層的 transformer encoder layers
for layer in self.transformer_encoder_layers:

# 輸出最後一層的 hidden layer and encoder self attention
# encoder_hidden [batch_size, src_len, hid_dim]
# encoder_self_attention [batch_size , attention_heads, src_len, src_len]

return encoder_hidden , encoder_self_attention
``````
• 將整個encoder疊起來，包含多頭self-attention與全連結層:
``````class TransformerEncoderLayer(nn.Module):
def __init__(self, hidden_dim , feedforward_dim, n_enc_layers, n_attn_heads, dropout , device):
"""
hidden_dim: embedding size
feedforward_dim: feedforward 維度
"""
super().__init__()

# 建立 Multi Head self Attention

# layer norm
self.self_attn_layernorm = nn.LayerNorm(hidden_dim)

# 建立 Position Wise Feedforward
self.feedforward_sublayer = PosFeedForwardSubLayer(hidden_dim,feedforward_dim,dropout)

# layer norm
self.feedforward_layernorm = nn.LayerNorm(hidden_dim)

self.dropout = nn.Dropout(dropout)

"""
src_embedding: [batch_size, src_len, hid_dim]
"""

# 將 K Q V 計算 attention
_src,  encoder_self_attention = self.self_attention_sublayer(src_embedding, src_embedding, src_embedding, src_mask)

# dropout, residual 殘差 connection and layer norm
# src_embedding [batch_size, src_len, hid_dim]
src_embedding = self.self_attn_layernorm(src_embedding + self.dropout(_src))

# positionwise feedforward
_src = self.feedforward_sublayer(src_embedding)

#dropout, residual 殘差 and layer norm
src_embedding = self.feedforward_layernorm(src_embedding + self.dropout(_src))

# 輸出 src_sentence hidden layer 和 encoder_self_attention
# src_embedding [batch_size, src_len, hid_dim]
# encoder_self_attention [batch_size, attension_heads, src_len, src_len]

return src_embedding , encoder_self_attention
``````
• 多頭self-attention：
``````class MultiHeadAttentionSubLayer(nn.Module):
def __init__(self, hidden_dim , n_attn_heads, dropout, device):
"""
hidden_dim: embedding size
dropout: dropout
"""
super().__init__()

# 確定 設定的 hidden layer 維度可以被 attention head 整除

# hidden layer 維度
self.hidden_dim = hidden_dim

# 定義 Wq Wk Wv
self.full_conn_q = nn.Linear(hidden_dim, hidden_dim)
self.full_conn_k = nn.Linear(hidden_dim, hidden_dim)
self.full_conn_v = nn.Linear(hidden_dim, hidden_dim)

# 最後一層 線性轉換
self.full_conn_o = nn.Linear(hidden_dim, hidden_dim)

self.dropout = nn.Dropout(dropout)

# 根據維度大小調整 attention 值 以免維度太大 Q dot K 結果過大影響學習效率

def forward(self, query_input, key_input, value_input, mask = None):
"""
query_input: q [batch_size, q_len, hid_dim]
key_input: q [batch_size, k_len, hid_dim]
value_input: q [batch_size, v_len, hid_dim]
"""
batch_size = query_input.shape[0]

# 定義 WQ*q -> Q WK*k -> K WV*v -> V
# Q [batch size, query len, hid dim]
# K [batch size, key len, hid dim]
# V [batch size, value len, hid dim]
Q = self.full_conn_q(query_input)
K = self.full_conn_k(key_input)
V = self.full_conn_v(value_input)

# 將 attention 分成多個 attention
return Q, K, V

# 將 attention 的 2 和 3 維度轉置 以達到將 attention head 提到前面 而分開每個 attention head
return Q , K , V

Q, K, V = seperate_heads (Q, K, V)

# 將Ｋ的最後兩個維度轉置做 Q * K 除以 scale
# scaled_dot_product_similarity [batch_size, n_heads, query_len, key_len]
scaled_dot_product_similarity = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale

# 隊最後一維做 softmax attention [batch_size, n_heads, query_len, key_len]
attention = torch.softmax(scaled_dot_product_similarity, dim = -1)

# 最後與 V 相乘
x = torch.matmul(self.dropout(attention), V)

x = x.permute(0, 2, 1, 3).contiguous()
#x [batch_size, query_len, hid_dim]
x = x.view(batch_size, -1, self.hidden_dim)

# 執行最後一層 x [batch_size, query_len, hid_dim]
x = self.full_conn_o(x)

return x, attention
``````
• 全連結層:
``````class PosFeedForwardSubLayer(nn.Module):
def __init__(self, hidden_dim, ff_dim, dropout):
super().__init__()
self.full_conn_1 = nn.Linear(hidden_dim, ff_dim)

self.full_conn_2 = nn.Linear(ff_dim,  hidden_dim)

self.dropout = nn.Dropout(dropout)

def forward(self, x):
# x [batch_size, seq_len, ff_dim]
x = self.dropout(torch.relu(self.full_conn_1(x)))

# x [batch_size, seq_len, hid_dim]
x = self.full_conn_2(x)

return x
``````