iT邦幫忙

2021 iThome 鐵人賽

DAY 24
2
AI & Data

AI Voice Conversion系列 第 24

【Day24】 Transformer 實作包(一)

開始施工

  • 本來是想分享自己參考網路作法再改寫出來的 Transformer,但後來發現自己的架構並不是原本的 Transformer encoder (也沒加 Positional Encoding),只是純粹弄了一個 MultiHeadAttention Layer 按自己意思亂接而已,這邊還是以 Hung-yi Lee 老師的版本為主來做說明。

先上個圖方便施工,雖然內部有些操作圖上上沒有畫出來

Create Feed-Forward Networks

圖片上的 Feed-Forward 寫成 Dense,實際上他是長這樣

def Create_feed_forward_network(d_model, dff):
  # 此 FFN 對輸入做兩個線性轉換,中間還加了一個 ReLU 
  return tf.keras.Sequential([
      Dense(dff, activation='relu'),  # (batch_size, seq_len, dff)
      Dense(d_model)                  # (batch_size, seq_len, d_model)
  ])

一般會讓 dff 這個參數大於 d_model,讓 FFN 從輸入的 d_model 維度裡頭學些有用的資訊。在論文中 d_model 為 512,dff 為 2048。兩個都是可以調整的參數。

Create MultiHeadAttention

MultiHead 裡最重要的就是要把 heads 給分離出來個別算,算完之後 concatenate 起來。

def split_heads(x, batch_size,num_head,depth):
    x = tf.reshape(x, (batch_size,-1,num_heads,depth))
    return tf.transpose(x, perm=[0, 2, 1, 3])

def do_MultiHeadAttention(q,k,v,mask):
  batch_size = tf.shape(q)[0]
  #  圖片上面沒講的是,q,k,v 都會做一次線性變換到 seq_len 維空間
  q = Dense(seq_len)(q)
  # (batch_size, num_heads, seq_len, depth)
  q = split_heads(q, batch_size,num_head,depth) 
  ...
  ... 同道裡 For k 跟 v
  ...
  # 昨天寫過了
  scaled_attention, attention_weights = do_attention(q, k, v, mask)
 #把分頭的結果接回來
  scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
  concat_attention = tf.reshape(scaled_attention, (batch_size, -1, seq_len)) 
  
  # 最後還會通過一個線性變換,圖片上也沒畫這個部分
  output = Dense(concat_attention)  # (batch_size, seq_len_q, d_model)
  return output
  

我們的版本其實已經不是原始的方法了,是這篇論文的做法source code 是 tf1.0 的,下面這個是我移植到 tf2.0 的,有驗證過結果正確,我們音樂的 Transformer (但其實不是) 裏頭的 ATTN 用的是下面這種版本,他宣稱是比較有效率的 ATTN,但我沒有研究這篇,有興趣的話你們可以參考看看。

class ATTN(tf.keras.layers.Layer):
    def __init__(self, n_state, n_head, seq):
        super(ATTN, self).__init__()
        self.n_state = n_state * 3
        self.n_head = n_head
        E_initializer = tf.constant_initializer(0)
        self.E = tf.Variable(
            E_initializer(shape=[16, seq, 32], dtype=tf.float32), name="E"
        )

    def split_heads(self, x):
        # From [batch, sequence, features] to [batch, heads, sequence, features]
        return tf.transpose(self.split_states(x, self.n_head), [0, 2, 1, 3])

    def split_states(self, x, n):
        """Reshape the last dimension of x into [n, x.shape[-1]/n]."""
        *start, m = shape_list(x)
        return tf.reshape(x, start + [n, m // n])

    def merge_heads(self, x):
        # Reverse of split_heads
        return self.merge_states(tf.transpose(x, [0, 2, 1, 3]))

    def mask_attn_weights(self, w):
        # w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst.
        _, _, nd, ns = shape_list(w)
        b = self.attention_mask(nd, ns, dtype=w.dtype)
        b = tf.reshape(b, [1, 1, nd, ns])
        w = w * b - tf.cast(1e10, w.dtype) * (1 - b)
        return w

    def merge_states(self, x):
        """Smash the last two dimensions of x into a single dimension."""
        *start, a, b = shape_list(x)
        return tf.reshape(x, start + [a * b])

    def attention_mask(self, nd, ns, *, dtype):
        """1's in the lower triangle, counting from the lower right corner.
        Same as tf.matrix_band_part(tf.ones([nd, ns]), -1, ns-nd), but doesn't produce garbage on TPUs.
        """
        i = tf.range(nd)[:, None]
        j = tf.range(ns)
        m = i >= j - ns + nd
        return tf.cast(m, dtype)

    def relative_attn(self, q):
        # q have shape [batch, heads, sequence, features]
        batch, heads, sequence, features = shape_list(q)
        # [heads, batch, sequence, features]
        q_ = tf.transpose(q, [1, 0, 2, 3])
        # [heads, batch * sequence, features]
        q_ = tf.reshape(q_, [heads, batch * sequence, features])
        # [heads, batch * sequence, sequence]
        rel = tf.matmul(q_, self.E, transpose_b=True)
        # [heads, batch, sequence, sequence]
        rel = tf.reshape(rel, [heads, batch, sequence, sequence])
        # [heads, batch, sequence, 1+sequence]
        rel = tf.pad(rel, ((0, 0), (0, 0), (0, 0), (1, 0)))
        # [heads, batch, sequence+1, sequence]
        rel = tf.reshape(rel, (heads, batch, sequence + 1, sequence))
        # [heads, batch, sequence, sequence]
        rel = rel[:, :, 1:]
        # [batch, heads, sequence, sequence]
        rel = tf.transpose(rel, [1, 0, 2, 3])
        return rel

    def multihead_attn(self, q, k, v):
        # q, k, v have shape [batch, heads, sequence, features]
        w = tf.matmul(q, k, transpose_b=True)
        w = w + self.relative_attn(q)
        w = w * tf.math.rsqrt(tf.cast(v.shape[-1], w.dtype))
        w = self.mask_attn_weights(w)
        w = tf.nn.softmax(w, axis=-1)
        a = tf.matmul(w, v)
        return a

    def call(self, inputs):
        q, k, v = map(self.split_heads, tf.split(inputs, 3, axis=2))
        present = tf.stack([k, v], axis=1)
        a = self.multihead_attn(q, k, v)
        a = self.merge_heads(a)
        return a, present
        

Create EncoderLayer

有了需要的 layer 之後,接下來就是疊疊樂時間了,按圖施工保證成功R

# Encoder 裡頭會有 N 個 EncoderLayers,每個 EncoderLayer 裡又有兩個 sub-layers: MultiHeadAttention & feed_forward_network
# 先來組一份 EncoderLayer
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self, d_model, num_heads, dff, rate=0.1):
    super(EncoderLayer, self).__init__()

    self.ffn = Create_feed_forward_network(d_model, dff)   
    # layer norm 很常在 RNN-based 的模型被使用。一個 sub-layer 一個 layer norm
    self.layernorm1 = LayerNormalization(epsilon=1e-6)
    self.layernorm2 = LayerNormalization(epsilon=1e-6)
    
    # 一個 sub-layer 一個 dropout layer
    # Transformer 論文內預設 dropout rate 為 0.1
    # 不怕 overfiting 的話你也可以不要用
    self.dropout1 = Dropout(rate)
    self.dropout2 = Dropout(rate)

  # 丟入 "isTraining" 參數因為 dropout 在訓練以及測試的行為不同
  def call(self, x, mask,isTraining=True):
    # 除了 `attn`,其他張量的 shape 皆為 (batch_size,input_seq_len, d_model)
    # attn.shape == (batch_size, num_heads, input_seq_len, input_seq_len)

    # Encoder 利用"自"注意機制,因此 q, k, v 全部都是自己
    
    # 還需要 padding mask 來遮住輸入序列中的 0 的地方  
    # 你也可以試試另一版本的 XD,或是參考老師的做法把他包成一個 layer
    attn_output, attn = do_MultiHeadAttention(x,x,x,mask)
    attn_output = self.dropout1(attn_output, training=isTrainig) 
    out1 = self.layernorm1(x + attn_output)  
    ffn_output = self.ffn(out1) 
    ffn_output = self.dropout2(ffn_output, training=isTraining)  # 記得 training
    out2 = self.layernorm2(out1 + ffn_output)
    return out2

Create Encoder

最後加上 position encoding 跟 Embedding,Encoder 就煉成了

class Encoder(tf.keras.layers.Layer):
  # Encoder 的初始參數除了本來就要給 EncoderLayer 的參數還多了:
  # - num_layers: 決定要有幾個 EncoderLayers, 前面影片中的 `N`
  # - input_vocab_size: 用來把索引轉成詞嵌入向量
  def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, 
               rate=0.1):
    super(Encoder, self).__init__()
    # 這是長度不是 model
    self.d_model = d_model
    # Input 進來要先通過這裡
    self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
    # 請見昨天
    self.pos_encoding = positional_encoding(input_vocab_size, self.d_model)

    # 建 N 個 EncoderLayers
    self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) 
                       for _ in range(num_layers)]

    self.dropout = tf.keras.layers.Dropout(rate)

  def call(self, x, training, mask):
    # 輸入的 x.shape == (batch_size, input_seq_len)
    # 以下各 layer 的輸出皆為 (batch_size, input_seq_len, d_model)
    input_seq_len = tf.shape(x)[1]

    # 將 2 維的索引序列轉成 3 維的詞嵌入張量,並依照論文乘上 sqrt(d_model)
    # 再加上對應長度的 position encoding
    x = self.embedding(x)
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x += self.pos_encoding[:, :input_seq_len, :]

    # 對 embedding 與 position encoding 的總合做 regularization,在 Decoder 也會做
    # 這地方有點神奇,它就只是想要對抗 overfitting 而已,做 regularization 跟 Dropout 都可以達成,圖上也沒有提到這個
    x = self.dropout(x, training=training)

    # 通過 N 個 EncoderLayer 做編碼
    for i, enc_layer in enumerate(self.enc_layers):
      x = enc_layer(x, training, mask)

    return x 

最後分享一下我拿來訓練音樂的架構

def TransformerGenerator(hparams, input_shape):

    n_vocab = hparams["EventDim"]
    n_embd = hparams["EmbeddingDim"]
    n_layer = hparams["Layers"]
    n_head = hparams["Heads"]
    n_sequence = hparams["Time"]

    batch_size = 1
    inputs = Input(shape=input_shape, dtype=tf.float32)

    # Feed-forword 用 CNN 
    h = dilated_causal_Conv1D(1,None,-1,-1)(inputs)
    nx = 512
    # 沒加 position encoding
    # N - layer Endcoer
    for layer in range(n_layer):
        ## ATTN ###
        nor = NormalizeDiagonal(n_embd)(h)
        a = MyConvld(nx, nx * 3, [batch_size, n_sequence])(nor)
        a, present = ATTN(nx, n_head,n_sequence)(a)
        a = MyConvld(nx, nx, [batch_size, n_sequence])(a)
        ##########
        h = Add()([h, a])
        ###########
        ##  MLP  ##
        nor = NormalizeDiagonal(n_embd)(h)
        a = MyConvld(nx, nx * 4, [batch_size, n_sequence])(nor)
        a = Activation("gelu")(a)
        m = MyConvld(nx * 4, nx, [batch_size, n_sequence])(a)
        ###########
        h = Add()([h, m])
        ###########

    ### output ###
    h = NormalizeDiagonal(n_embd)(h)
    ### back to 0~1
    h = Dense(n_sequence)(h)
    ## 只是想實驗看看 Transformer + Gru 會不會更厲害
    h = GRU(256)(h)    
    h = Activation("sigmoid")(h)
    h = Reshape((256,1))(h)
    return Model(inputs, h)

完整的程式碼你可以在這邊找到

小結

今天我們實作完了 Encoder 的架構,明天再來解決 Decoder 吧!

/images/emoticon/emoticon09.gif/images/emoticon/emoticon13.gif/images/emoticon/emoticon14.gif/images/emoticon/emoticon22.gif/images/emoticon/emoticon28.gif


上一篇
【Day23】 Transformer 新手包 (三)
下一篇
【Day25】 Transformer 實作包(二)
系列文
AI Voice Conversion30

尚未有邦友留言

立即登入留言