iT邦幫忙

2021 iThome 鐵人賽

DAY 20
0
自我挑戰組

30天初步了解自然語言處理-自學筆記系列 第 20

[Day20] NLP會用到的模型(四)-LSTM實現POS

一. 資料準備

  • 此資料與[Day9]的資料一樣為conllu檔,都是作為POS任務
  • 架構如下,每個詞經過word embedding轉成詞向量後再經過LSTM輸出對應詞的詞性:
    https://ithelp.ithome.com.tw/upload/images/20210920/20140426lkFvCOLUFZ.png

data: https://github.com/UniversalDependencies/UD_Chinese-GSD

  • 匯入與整理資料,整理成想要的格式,一個專門紀錄句字的list,一個專門記錄POS的tag
from io import open
from conllu import parse_incr
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import re
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence

data_file = open("/content/drive/My Drive/zh_gsd-ud-train.conllu", "r", encoding="utf-8")

# save dataset
sentences_list = []
pos_list = []

for tokenlist in parse_incr(data_file):
    temp_str = []
    temp_pos = []
    for s in tokenlist:
        temp_str.append(s['form'])
        temp_pos.append(s['upos'])
    sentences_list.append(temp_str)
    pos_list.append(temp_pos)
  • 整理成train data與test data:
train_sentence = sentences_list[:3500]
train_pos = pos_list[:3500]

test_sentence = sentences_list[3500:]
test_pos = pos_list[3500:]

二. train一個word2vec作為embedding層

  • 迅速訓練一個word2vec模型,維度為250:
from gensim.models import Word2Vec
import numpy as np

# Train Word2Vec model
emb_model = Word2Vec(sentences_list, size=250, window=5, min_count=1, workers=4, sg=1, hs=1, iter=50)
num_words = len(emb_model.wv.vocab) + 1
pad = '<pad>'
unk = '<unk>'
pad_id = 0
unk_id = 1

idx2word = {
  0: '<pad>',
}

word2idx = {
  '<pad>': 0,
}

count_idx2w = 1
count_w2idx = 1
  • 將詞向量包成embedding要的格式,並多一個向量處理oov的維度:
emb_matrix = np.zeros((num_words, 250), dtype=float)
for i in range(num_words - 1):
    v = emb_model.wv[emb_model.wv.index2word[i]]
    emb_matrix[i+1] = v   # Plus 1 to reserve index 0 for OOV words
    idx2word[count_idx2w] = emb_model.wv.index2word[i]
    word2idx[emb_model.wv.index2word[i]] = count_w2idx
    count_idx2w += 1
    count_w2idx += 1

emb_matrix.shape

# (17613, 250)
  • 整理pos轉id的dict與id轉pos的dict:
def add_dict(list_data: list, first_insert_list: list):
    dict_data = {}
    idx2dict = {}
    count = 0

    if len(first_insert_list) != 0:
      for ele in first_insert_list:
        if ele not in dict_data:
          dict_data[ele] = count
          idx2dict[count] = ele
          count += 1
    
    for first_ele in list_data:
        for second_ele in first_ele:
            if second_ele not in dict_data:
                dict_data[second_ele] = count
                idx2dict[count] = second_ele
                count += 1
    
    return dict_data, idx2dict

# 建立辭典
dict_pos, idx2pos = add_dict(pos_list, ['igner'])
dict_pos

# {'ADJ': 2,
# 'ADP': 10,
# 'ADV': 4,
# 'AUX': 1,
# 'CCONJ': 12,
# 'DET': 11,
# 'NOUN': 7,
# 'NUM': 6,
# 'PART': 9,
# 'PRON': 8,
# 'PROPN': 13,
# 'PUNCT': 3,
# 'SYM': 15,
# 'VERB': 5,
# 'X': 14,
# 'igner': 0}
  • 建立dataset,記住用pytorch在餵資料時需要先將資料包成dataset,經過dataloader,才能餵進去model:
# 建立Dataset,餵給LSTM使用
class POS_Dataset(Dataset):
    def __init__(self, sentence_list, pos_list, word2idx, dict_pos):
        self.sentence_list = sentence_list
        self.pos_list = pos_list
        self.dict_words = word2idx
        self.dict_pos = dict_pos
    
    def __getitem__(self, idx):
        source = self.sentence_list[idx]
        target = self.pos_list[idx]

        data_X = [self.dict_words[w] for w in source]
        target_Y = [self.dict_pos[tag] for tag in target]
        
        data_X = torch.tensor(data_X, dtype=torch.long)
        target_Y = torch.tensor(target_Y, dtype=torch.long)
        
        return data_X, target_Y
        
    def __len__(self):
        return len(self.sentence_list)
  • 定義batch取出的型態:
# 定義幫成dataloader後batch取出來有哪些資料
def pad_collate(batch):
  (xx, yy) = zip(*batch)
  x_lens = [len(x) for x in xx]
  y_lens = [len(y) for y in yy]

  xx_pad = pad_sequence(xx, batch_first=True, padding_value=pad_id)
  yy_pad = pad_sequence(yy, batch_first=True, padding_value=dict_pos['igner'])

  return xx_pad, yy_pad, x_lens, y_lens

batch_size = 100
# 將train data包成dataset的形式,再轉成dataloader,到時批次計算時會轉成pad_collate的形式
dataset = POS_Dataset(train_sentence, train_pos, word2idx, dict_pos)
train_loader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate)

test_dataset = POS_Dataset(test_sentence, test_pos, word2idx, dict_pos)
test_loader = DataLoader(dataset=test_dataset, batch_size=len(sentences_list)-3500, shuffle=False, collate_fn=pad_collate)

三. 模型建立 many-to-many model

  • 建立LSTM model:
class LSTM_Tagger(nn.Module):
    def __init__(self, n_hidden, num_layers, dropout_ratio, pos_size, weight):
        super(LSTM_Tagger, self).__init__()
        
        self.dropout = nn.Dropout(dropout_ratio)
        self.word_embeddings = nn.Embedding.from_pretrained(weight, padding_idx = 0)
        self.word_embeddings.weight.requires_grad=False

        self.pos_size = pos_size
        self.lstm = nn.LSTM(input_size=250, hidden_size=n_hidden, dropout=dropout_ratio, num_layers=num_layers, bidirectional=True)
        self.fc1 = nn.Linear(in_features=n_hidden*2, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=pos_size)

    def forward(self, inputs, x_len):
        batch_size = inputs.size(0)
        embedded = self.word_embeddings(inputs)
        # 這邊有用到pack_padded_sequence來動態處理batch的每個長度
        x_packed = pack_padded_sequence(embedded, x_len, batch_first=True, enforce_sorted=False)
        
        output_packed, (hidden, cell) = self.lstm(x_packed)
        outputs, output_lengths = pad_packed_sequence(output_packed, batch_first=True)

        logits = self.fc1(outputs)
        logits = self.dropout(logits)
        logits = self.fc2(logits).view(-1, self.pos_size)

        return logits
  • 計算accuracy:
def categorical_accuracy(preds, y, tag_pad_idx):
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    non_pad_elements = (y != tag_pad_idx).nonzero()
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / y[non_pad_elements].shape[0]
  • 撰寫train的流程,返回loss與accuracy:
def train_batch(model, x_padded, y_padded, x_lens, criterion, optimizer, device):
    model.train()
    inputs, targets = x_padded.to(device), y_padded.to(device)
    outputs = model(inputs, x_lens)
    targets = targets.view(-1)
    loss = criterion(outputs, targets)

    acc = categorical_accuracy(outputs, targets, 0)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    return loss.item(), acc.item()
  • 開始train與evaluation:
# 訓練模型
epochs = 100
lr = 0.0001

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = LSTM_Tagger(128, 1, 0.8, len(dict_pos), torch.Tensor(emb_matrix))
model.to(device)

criterion = nn.CrossEntropyLoss(ignore_index = 0)
criterion.to(device)

optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=5e-4)

train_loss_list = []
val_loss_list = []

for epoch in range(1, 1 + epochs):
    tot_train_loss = 0
    tot_train_acc = 0
    tot_val_loss = 0
    tot_val_acc = 0

    for (x_padded, y_padded, x_lens, y_lens) in train_loader:
      loss, acc = train_batch(model, x_padded, y_padded, x_lens, criterion, optimizer, device)
      tot_train_loss += loss
      tot_train_acc += acc

    train_loss_list.append(tot_train_loss/35)
    
    
    model.eval()
    for (x_padded, y_padded, x_lens, y_lens) in test_loader:
      inputs, targets = x_padded.to(device), y_padded.to(device)
      outputs = model(inputs, x_lens)
      targets = targets.view(-1)
      loss = criterion(outputs, targets).item()
      acc = categorical_accuracy(outputs, targets, 0).item()
      tot_val_loss += loss
      tot_val_acc += acc

    val_loss_list.append(tot_val_loss)

    print('epoch ', epoch, 'train_loss: ', tot_train_loss/35, '| train_acc: ', tot_train_acc/35)
    print('       ', 'val_loss: ', tot_val_loss, '| val_acc: ', tot_val_acc)

# output 最後的情形: epoch  100 train_loss:  0.5527403695242745 | train_acc: 0.8298637798854283 val_loss:  0.559070348739624 | val_acc:  0.8141258358955383

四. 結果預測:

test_dataset = POS_Dataset(test_sentence, test_pos, word2idx, dict_pos)
test_loader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False, collate_fn=pad_collate)

model.eval()

def idx2str(idx_list, idx_dict):
  temp_str = []
  for idx in idx_list:
    temp_str.append(idx_dict[idx.item()])
  
  return temp_str

for (x_padded, y_padded, x_lens, y_lens) in test_loader:
  inputs, targets = x_padded.to(device), y_padded.to(device)
  print(idx2str(inputs[0], idx2word))
  print(idx2str(targets[0], idx2pos))
  outputs = model(inputs, x_lens).view(-1, len(dict_pos)).argmax(dim = -1)
  temp_str = []
  for idx in outputs:
    temp_str.append(idx2pos[idx.item()])
  print(temp_str)
  print("-----------------------")
  
  break

# output:
# ['此線', '設', '12', '歲', '以下', '小童', '及', '65', '歲', '或', '以上', '長者', '半', '價', '優惠', ',', '此線', '所有', '巴士', '均', '接受', '八', '達', '通', '卡', '付款', ',', '上車', '付款', '、', '不', '設', '找續', '。']
# ['NOUN', 'VERB', 'NUM', 'NOUN', 'ADP', 'NOUN', 'CCONJ', 'NUM', 'NOUN', 'CCONJ', 'DET', 'NOUN', 'NUM', 'NOUN', 'NOUN', 'PUNCT', 'NOUN', 'DET', 'NOUN', 'ADV', 'VERB', 'NUM', 'VERB', 'PART', 'PART', 'VERB', 'PUNCT', 'VERB', 'VERB', 'PUNCT', 'ADV', 'VERB', 'VERB', 'PUNCT']
# ['NOUN', 'VERB', 'NUM', 'NOUN', 'NUM', 'NOUN', 'CCONJ', 'PART', 'NOUN', 'CCONJ', 'ADP', 'NOUN', 'NUM', 'NOUN', 'VERB', 'PUNCT', 'NOUN', 'DET', 'NOUN', 'ADV', 'VERB', 'NUM', 'VERB', 'PART', 'PART', 'NOUN', 'PUNCT', 'NOUN', 'NOUN', 'PUNCT', 'ADV', 'VERB', 'NOUN', 'PUNCT']

其實效果還算不錯,驗證資料也有達到80%~
這個程式我當初是在colab上跑的還滿快的,如果沒有GPU可以試試google的colab唷


明天會開始介紹seq2seq的模型


上一篇
[Day19] NLP會用到的模型(三)-RNN應用
下一篇
[Day21] NLP會用到的模型(四)-Seq2Seq
系列文
30天初步了解自然語言處理-自學筆記30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言