iT邦幫忙

2023 iThome 鐵人賽

DAY 25
1
AI & Data

ML From Scratch系列 第 25

[Day 25] Recurrent Neural Network — 主題實作

  • 分享至 

  • xImage
  •  

Dataset

Dinosaur Island

一份文字檔,裡頭紀錄許多恐龍的類別。

Implementation

Import Library

import os
import numpy as np
import scipy as sp

Data Preprocessing

class DataGenerator:
    def __init__(self, path):
        self.path = path
        
        # Read in data from file and convert to lowercase
        with open(path) as f:
            data = f.read().lower()
        
        # Create list of unique characters in the data
        self.chars = list(set(data))
        
        # Create dictionaries mapping characters to and from their index in the list of unique characters
        self.char_to_idx = {ch: i for (i, ch) in enumerate(self.chars)}
        self.idx_to_char = {i: ch for (i, ch) in enumerate(self.chars)}
        
        # Set the size of the vocabulary (i.e. number of unique characters)
        self.vocab_size = len(self.chars)
        
        # Read in examples from file and convert to lowercase, removing leading/trailing white space
        with open(path) as f:
            examples = f.readlines()
        self.examples = [x.lower().strip() for x in examples]
 
    def generate_example(self, idx):
        example_chars = self.examples[idx]
        
        # Convert the characters in the example to their corresponding indices in the list of unique characters
        example_char_idx = [self.char_to_idx[char] for char in example_chars]
        
        # Add newline character as the first character in the input array, and as the last character in the output array
        X = [self.char_to_idx['\n']] + example_char_idx
        Y = example_char_idx + [self.char_to_idx['\n']]
        
        return np.array(X), np.array(Y)

這裡負責處理生成字符級別的語言模型的訓練數據,從文本文件中讀取數據,將數據轉換為小寫,然後將字符映射到唯一索引,並生成輸入和輸出示例供語言模型訓練使用。

__init__(path)

初始化 DataGenerator 物件。

接著一系列文本操作:

  • 從文件中讀取數據並轉換為小寫
  • 建立包含數據中唯一字符的列表
  • 建立將字符與它們在唯一字符列表中的索引之間映射的字典
  • 設定詞彙表的大小
  • 從文件中讀取示例並轉換為小寫,刪除前導/後尾空格

generate_example(idx)

這裡我們會基於給定的索引生成語言模型的輸入/輸出示例。

將示例中的字符轉換為它們在唯一字符列表中的相應索引在輸入陣列中添加換行字符作為第一個字符,在輸出陣列中作為最後一個字符。

RNN

class RNN:
    def __init__(self, hidden_size, data_generator, sequence_length, learning_rate):
        # hyper parameters
        self.hidden_size = hidden_size
        self.data_generator = data_generator
        self.vocab_size = self.data_generator.vocab_size
        self.sequence_length = sequence_length
        self.learning_rate = learning_rate
        self.X = None

        # model parameters
        self.Wax = np.random.uniform(-np.sqrt(1. / self.vocab_size), np.sqrt(1. / self.vocab_size), (hidden_size, self.vocab_size))
        self.Waa = np.random.uniform(-np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size), (hidden_size, hidden_size))
        self.Wya = np.random.uniform(-np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size), (self.vocab_size, hidden_size))
        self.ba = np.zeros((hidden_size, 1))  
        self.by = np.zeros((self.vocab_size, 1))
        
        # Initialize gradients
        self.dWax, self.dWaa, self.dWya = np.zeros_like(self.Wax), np.zeros_like(self.Waa), np.zeros_like(self.Wya)
        self.dba, self.dby = np.zeros_like(self.ba), np.zeros_like(self.by)
        
        # parameter update with AdamW
        self.mWax = np.zeros_like(self.Wax)
        self.vWax = np.zeros_like(self.Wax)
        self.mWaa = np.zeros_like(self.Waa)
        self.vWaa = np.zeros_like(self.Waa)
        self.mWya = np.zeros_like(self.Wya)
        self.vWya = np.zeros_like(self.Wya)
        self.mba = np.zeros_like(self.ba)
        self.vba = np.zeros_like(self.ba)
        self.mby = np.zeros_like(self.by)
        self.vby = np.zeros_like(self.by)

    def softmax(self, x):
        # shift the input to prevent overflow when computing the exponentials
        x = x - np.max(x)
        # compute the exponentials of the shifted input
        p = np.exp(x)
        # normalize the exponentials by dividing by their sum
        return p / np.sum(p)

    def forward(self, X, a_prev):
        # Initialize dictionaries to store activations and output probabilities.
        x, a, y_pred = {}, {}, {}

        # Store the input data in the class variable for later use in the backward pass.
        self.X = X

        # Set the initial activation to the previous activation.
        a[-1] = np.copy(a_prev)
        # iterate over each time step in the input sequence
        for t in range(len(self.X)): 
            # get the input at the current time step
            x[t] = np.zeros((self.vocab_size,1)) 
            if (self.X[t] != None):
                x[t][self.X[t]] = 1
            # compute the hidden activation at the current time step
            a[t] = np.tanh(np.dot(self.Wax, x[t]) + np.dot(self.Waa, a[t - 1]) + self.ba)
            # compute the output probabilities at the current time step
            y_pred[t] = self.softmax(np.dot(self.Wya, a[t]) + self.by)
            # add an extra dimension to X to make it compatible with the shape of the input to the backward pass
         # return the input, hidden activations, and output probabilities at each time step
        return x, a, y_pred 
    
    def backward(self,x, a, y_preds, targets):
        # Initialize derivative of hidden state for the last time-step
        da_next = np.zeros_like(a[0])

        # Loop through the input sequence backwards
        for t in reversed(range(len(self.X))):
            # Calculate derivative of output probability vector
            dy_preds = np.copy(y_preds[t])
            dy_preds[targets[t]] -= 1

            # Calculate derivative of hidden state
            da = np.dot(self.Waa.T, da_next) + np.dot(self.Wya.T, dy_preds)
            dtanh = (1 - np.power(a[t], 2))
            da_unactivated = dtanh * da

            # Calculate gradients
            self.dba += da_unactivated
            self.dWax += np.dot(da_unactivated, x[t].T)
            self.dWaa += np.dot(da_unactivated, a[t - 1].T)

            # Update derivative of hidden state for the next iteration
            da_next = da_unactivated

            # Calculate gradient for output weight matrix
            self.dWya += np.dot(dy_preds, a[t].T)

            # clip gradients to avoid exploding gradients
            for grad in [self.dWax, self.dWaa, self.dWya, self.dba, self.dby]:
                np.clip(grad, -1, 1, out=grad)
 
    def loss(self, y_preds, targets):
        # calculate cross-entropy loss
        return sum(-np.log(y_preds[t][targets[t], 0]) for t in range(len(self.X)))
    
    def adamw(self, beta1=0.9, beta2=0.999, epsilon=1e-8, L2_reg=1e-4):
        # AdamW update for Wax
        self.mWax = beta1 * self.mWax + (1 - beta1) * self.dWax
        self.vWax = beta2 * self.vWax + (1 - beta2) * np.square(self.dWax)
        m_hat = self.mWax / (1 - beta1)
        v_hat = self.vWax / (1 - beta2)
        self.Wax -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Wax)

        # AdamW update for Waa
        self.mWaa = beta1 * self.mWaa + (1 - beta1) * self.dWaa
        self.vWaa = beta2 * self.vWaa + (1 - beta2) * np.square(self.dWaa)
        m_hat = self.mWaa / (1 - beta1)
        v_hat = self.vWaa / (1 - beta2)
        self.Waa -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Waa)

        # AdamW update for Wya
        self.mWya = beta1 * self.mWya + (1 - beta1) * self.dWya
        self.vWya = beta2 * self.vWya + (1 - beta2) * np.square(self.dWya)
        m_hat = self.mWya / (1 - beta1)
        v_hat = self.vWya / (1 - beta2)
        self.Wya -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.Wya)

        # AdamW update for ba
        self.mba = beta1 * self.mba + (1 - beta1) * self.dba
        self.vba = beta2 * self.vba + (1 - beta2) * np.square(self.dba)
        m_hat = self.mba / (1 - beta1)
        v_hat = self.vba / (1 - beta2)
        self.ba -= self.learning_rate * (m_hat / (np.sqrt(v_hat) + epsilon) + L2_reg * self.ba)

        # AdamW update for by
        self.mby = beta1 * self.mby + (1 - beta1) * self.dby
        self.vby = beta2 * self.vby + (1 - beta2) * np.square(self.dby)
    
    def sample(self):
        # initialize input and hidden state
        x = np.zeros((self.vocab_size, 1))
        a_prev = np.zeros((self.hidden_size, 1))

        # create an empty list to store the generated character indices
        indices = []

        # idx is a flag to detect a newline character, initialize it to -1
        idx = -1

        # generate sequence of characters
        counter = 0
        max_chars = 50 # maximum number of characters to generate
        newline_character = self.data_generator.char_to_idx['\n'] # the newline character

        while (idx != newline_character and counter != max_chars):
            # compute the hidden state
            a = np.tanh(np.dot(self.Wax, x) + np.dot(self.Waa, a_prev) + self.ba)

            # compute the output probabilities
            y = self.softmax(np.dot(self.Wya, a) + self.by)

            # sample the next character from the output probabilities
            idx = np.random.choice(list(range(self.vocab_size)), p=y.ravel())

            # set the input for the next time step
            x = np.zeros((self.vocab_size, 1))
            x[idx] = 1

            # store the sampled character index in the list
            indices.append(idx)

            # update the previous hidden state
            a_prev = a

            # increment the counter
            counter += 1

        # return the list of sampled character indices
        return indices

        
    def train(self, generated_names=5):
        iter_num = 0
        threshold = 5 # stopping criterion for training
        smooth_loss = -np.log(1.0 / self.data_generator.vocab_size) * self.sequence_length  # initialize loss

        while (smooth_loss > threshold):
            a_prev = np.zeros((self.hidden_size, 1))
            idx = iter_num % self.vocab_size
            # get a batch of inputs and targets
            inputs, targets = self.data_generator.generate_example(idx)

            # forward pass
            x, a, y_pred  = self.forward(inputs, a_prev)

            # backward pass
            self.backward(x, a, y_pred, targets)

            # calculate and update loss
            loss = self.loss(y_pred, targets)
            self.adamw()
            smooth_loss = smooth_loss * 0.999 + loss * 0.001

            # update previous hidden state for the next batch
            a_prev = a[len(self.X) - 1]
            # print progress every 500 iterations
            if iter_num % 500 == 0:
                print("\n\niter :%d, loss:%f\n" % (iter_num, smooth_loss))
                for i in range(generated_names):
                    sample_idx = self.sample()
                    txt = ''.join(self.data_generator.idx_to_char[idx] for idx in sample_idx)
                    txt = txt.title()  # capitalize first character 
                    print ('%s' % (txt, ), end='')
            iter_num += 1
    
    def predict(self, start):
        # Initialize input vector and previous hidden state
        x = np.zeros((self.vocab_size, 1))
        a_prev = np.zeros((self.hidden_size, 1))

        # Convert start sequence to indices
        chars = [ch for ch in start]
        idxes = []
        for i in range(len(chars)):
            idx = self.data_generator.char_to_idx[chars[i]]
            x[idx] = 1
            idxes.append(idx)

        # Generate sequence
        max_chars = 50  # maximum number of characters to generate
        newline_character = self.data_generator.char_to_idx['\n']  # the newline character
        counter = 0
        while (idx != newline_character and counter != max_chars):
            # Compute next hidden state and predicted character
            a = np.tanh(np.dot(self.Wax, x) + np.dot(self.Waa, a_prev) + self.ba)
            y_pred = self.softmax(np.dot(self.Wya, a) + self.by)
            idx = np.random.choice(range(self.vocab_size), p=y_pred.ravel())

            # Update input vector, previous hidden state, and indices
            x = np.zeros((self.vocab_size, 1))
            x[idx] = 1
            a_prev = a
            idxes.append(idx)
            counter += 1

        # Convert indices to characters and concatenate into a string
        txt = ''.join(self.data_generator.idx_to_char[i] for i in idxes)

        # Remove newline character if it exists at the end of the generated sequence
        if txt[-1] == '\n':
            txt = txt[:-1]

        return txt

Attributes

  • hidden_size(int):RNN 中隱藏單元的數量。
  • data_generator(DataGenerator):用於生成訓練數據的 DataGenerator 物件。
  • vocab_size(int):RNN 使用的詞彙大小。
  • sequence_length(int):輸入序列的長度。
  • learning_rate(float):訓練時使用的學習速率。
  • X(ndarray):輸入數據。
  • WaxWaaWyababy(ndarray):模型參數,分別表示輸入到隱藏層的權重、隱藏層到隱藏層的權重、隱藏層到輸出層的權重,以及隱藏層和輸出層的偏差。
  • 梯度相關變數:dWaxdWaadWyadbadby(ndarray):存儲參數的梯度。
  • AdamW 相關變數:mWaxvWaxmWaavWaamWyavWyambavbambyvby(ndarray):AdamW 優化算法的變數。

__init__(self, hidden_size, data_generator, sequence_length, learning_rate)

初始化 RNN 類別的實例。

softmax(self, x)

計算給定輸入陣列的 softmax activation function

forward(self, X, a_prev)

計算 RNN 的前向傳遞。

backward(self, x, a, y_preds, targets)

實現 RNN 的反向傳遞。

loss(self, y_preds, targets)

計算給定預測概率和真實目標序列的交叉熵損失。

adamw(self, beta1, beta2, epsilon, L2_reg)

使用 AdamW 優化算法更新 RNN 的參數。

sample(self)

生成一個序列的字符。

train(self, generated_names)

使用時間反向傳播(BPTT)訓練 RNN 模型。

predict(self, start)

生成一個字符序列,從給定的起始序列開始。

Train RNN

data_generator = DataGenerator('/kaggle/input/dinosaur-island/dinos.txt')
rnn = RNN(hidden_size=200,data_generator=data_generator, sequence_length=25, learning_rate=1e-3)
rnn.train()

Predict the words

rnn.predict("meo")

Output

'meoeousaurus'

明天,我們會透過 Kaggle competition 來使用 Recurrent Neural Network 解決 Store Sales - Time Series Forecasting 所碰到的議題!!!

/images/emoticon/emoticon37.gif

Reference


上一篇
[Day 24] Recurrent Neural Network — 背後理論
下一篇
[Day 26] Recurrent Neural Network — 解決真實問題
系列文
ML From Scratch31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言