先上個圖方便施工,雖然內部有些操作圖上上沒有畫出來
圖片上的 Feed-Forward 寫成 Dense,實際上他是長這樣
def Create_feed_forward_network(d_model, dff):
# 此 FFN 對輸入做兩個線性轉換,中間還加了一個 ReLU
return tf.keras.Sequential([
Dense(dff, activation='relu'), # (batch_size, seq_len, dff)
Dense(d_model) # (batch_size, seq_len, d_model)
])
一般會讓 dff 這個參數大於 d_model,讓 FFN 從輸入的 d_model 維度裡頭學些有用的資訊。在論文中 d_model 為 512,dff 為 2048。兩個都是可以調整的參數。
MultiHead 裡最重要的就是要把 heads 給分離出來個別算,算完之後 concatenate 起來。
def split_heads(x, batch_size,num_head,depth):
x = tf.reshape(x, (batch_size,-1,num_heads,depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def do_MultiHeadAttention(q,k,v,mask):
batch_size = tf.shape(q)[0]
# 圖片上面沒講的是,q,k,v 都會做一次線性變換到 seq_len 維空間
q = Dense(seq_len)(q)
# (batch_size, num_heads, seq_len, depth)
q = split_heads(q, batch_size,num_head,depth)
...
... 同道裡 For k 跟 v
...
# 昨天寫過了
scaled_attention, attention_weights = do_attention(q, k, v, mask)
#把分頭的結果接回來
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, seq_len))
# 最後還會通過一個線性變換,圖片上也沒畫這個部分
output = Dense(concat_attention) # (batch_size, seq_len_q, d_model)
return output
我們的版本其實已經不是原始的方法了,是這篇論文的做法,source code 是 tf1.0 的,下面這個是我移植到 tf2.0 的,有驗證過結果正確,我們音樂的 Transformer (但其實不是) 裏頭的 ATTN 用的是下面這種版本,他宣稱是比較有效率的 ATTN,但我沒有研究這篇,有興趣的話你們可以參考看看。
class ATTN(tf.keras.layers.Layer):
def __init__(self, n_state, n_head, seq):
super(ATTN, self).__init__()
self.n_state = n_state * 3
self.n_head = n_head
E_initializer = tf.constant_initializer(0)
self.E = tf.Variable(
E_initializer(shape=[16, seq, 32], dtype=tf.float32), name="E"
)
def split_heads(self, x):
# From [batch, sequence, features] to [batch, heads, sequence, features]
return tf.transpose(self.split_states(x, self.n_head), [0, 2, 1, 3])
def split_states(self, x, n):
"""Reshape the last dimension of x into [n, x.shape[-1]/n]."""
*start, m = shape_list(x)
return tf.reshape(x, start + [n, m // n])
def merge_heads(self, x):
# Reverse of split_heads
return self.merge_states(tf.transpose(x, [0, 2, 1, 3]))
def mask_attn_weights(self, w):
# w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst.
_, _, nd, ns = shape_list(w)
b = self.attention_mask(nd, ns, dtype=w.dtype)
b = tf.reshape(b, [1, 1, nd, ns])
w = w * b - tf.cast(1e10, w.dtype) * (1 - b)
return w
def merge_states(self, x):
"""Smash the last two dimensions of x into a single dimension."""
*start, a, b = shape_list(x)
return tf.reshape(x, start + [a * b])
def attention_mask(self, nd, ns, *, dtype):
"""1's in the lower triangle, counting from the lower right corner.
Same as tf.matrix_band_part(tf.ones([nd, ns]), -1, ns-nd), but doesn't produce garbage on TPUs.
"""
i = tf.range(nd)[:, None]
j = tf.range(ns)
m = i >= j - ns + nd
return tf.cast(m, dtype)
def relative_attn(self, q):
# q have shape [batch, heads, sequence, features]
batch, heads, sequence, features = shape_list(q)
# [heads, batch, sequence, features]
q_ = tf.transpose(q, [1, 0, 2, 3])
# [heads, batch * sequence, features]
q_ = tf.reshape(q_, [heads, batch * sequence, features])
# [heads, batch * sequence, sequence]
rel = tf.matmul(q_, self.E, transpose_b=True)
# [heads, batch, sequence, sequence]
rel = tf.reshape(rel, [heads, batch, sequence, sequence])
# [heads, batch, sequence, 1+sequence]
rel = tf.pad(rel, ((0, 0), (0, 0), (0, 0), (1, 0)))
# [heads, batch, sequence+1, sequence]
rel = tf.reshape(rel, (heads, batch, sequence + 1, sequence))
# [heads, batch, sequence, sequence]
rel = rel[:, :, 1:]
# [batch, heads, sequence, sequence]
rel = tf.transpose(rel, [1, 0, 2, 3])
return rel
def multihead_attn(self, q, k, v):
# q, k, v have shape [batch, heads, sequence, features]
w = tf.matmul(q, k, transpose_b=True)
w = w + self.relative_attn(q)
w = w * tf.math.rsqrt(tf.cast(v.shape[-1], w.dtype))
w = self.mask_attn_weights(w)
w = tf.nn.softmax(w, axis=-1)
a = tf.matmul(w, v)
return a
def call(self, inputs):
q, k, v = map(self.split_heads, tf.split(inputs, 3, axis=2))
present = tf.stack([k, v], axis=1)
a = self.multihead_attn(q, k, v)
a = self.merge_heads(a)
return a, present
有了需要的 layer 之後,接下來就是疊疊樂時間了,按圖施工保證成功R
# Encoder 裡頭會有 N 個 EncoderLayers,每個 EncoderLayer 裡又有兩個 sub-layers: MultiHeadAttention & feed_forward_network
# 先來組一份 EncoderLayer
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.ffn = Create_feed_forward_network(d_model, dff)
# layer norm 很常在 RNN-based 的模型被使用。一個 sub-layer 一個 layer norm
self.layernorm1 = LayerNormalization(epsilon=1e-6)
self.layernorm2 = LayerNormalization(epsilon=1e-6)
# 一個 sub-layer 一個 dropout layer
# Transformer 論文內預設 dropout rate 為 0.1
# 不怕 overfiting 的話你也可以不要用
self.dropout1 = Dropout(rate)
self.dropout2 = Dropout(rate)
# 丟入 "isTraining" 參數因為 dropout 在訓練以及測試的行為不同
def call(self, x, mask,isTraining=True):
# 除了 `attn`,其他張量的 shape 皆為 (batch_size,input_seq_len, d_model)
# attn.shape == (batch_size, num_heads, input_seq_len, input_seq_len)
# Encoder 利用"自"注意機制,因此 q, k, v 全部都是自己
# 還需要 padding mask 來遮住輸入序列中的 0 的地方
# 你也可以試試另一版本的 XD,或是參考老師的做法把他包成一個 layer
attn_output, attn = do_MultiHeadAttention(x,x,x,mask)
attn_output = self.dropout1(attn_output, training=isTrainig)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=isTraining) # 記得 training
out2 = self.layernorm2(out1 + ffn_output)
return out2
最後加上 position encoding 跟 Embedding,Encoder 就煉成了
class Encoder(tf.keras.layers.Layer):
# Encoder 的初始參數除了本來就要給 EncoderLayer 的參數還多了:
# - num_layers: 決定要有幾個 EncoderLayers, 前面影片中的 `N`
# - input_vocab_size: 用來把索引轉成詞嵌入向量
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
rate=0.1):
super(Encoder, self).__init__()
# 這是長度不是 model
self.d_model = d_model
# Input 進來要先通過這裡
self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
# 請見昨天
self.pos_encoding = positional_encoding(input_vocab_size, self.d_model)
# 建 N 個 EncoderLayers
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
# 輸入的 x.shape == (batch_size, input_seq_len)
# 以下各 layer 的輸出皆為 (batch_size, input_seq_len, d_model)
input_seq_len = tf.shape(x)[1]
# 將 2 維的索引序列轉成 3 維的詞嵌入張量,並依照論文乘上 sqrt(d_model)
# 再加上對應長度的 position encoding
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :input_seq_len, :]
# 對 embedding 與 position encoding 的總合做 regularization,在 Decoder 也會做
# 這地方有點神奇,它就只是想要對抗 overfitting 而已,做 regularization 跟 Dropout 都可以達成,圖上也沒有提到這個
x = self.dropout(x, training=training)
# 通過 N 個 EncoderLayer 做編碼
for i, enc_layer in enumerate(self.enc_layers):
x = enc_layer(x, training, mask)
return x
最後分享一下我拿來訓練音樂的架構
def TransformerGenerator(hparams, input_shape):
n_vocab = hparams["EventDim"]
n_embd = hparams["EmbeddingDim"]
n_layer = hparams["Layers"]
n_head = hparams["Heads"]
n_sequence = hparams["Time"]
batch_size = 1
inputs = Input(shape=input_shape, dtype=tf.float32)
# Feed-forword 用 CNN
h = dilated_causal_Conv1D(1,None,-1,-1)(inputs)
nx = 512
# 沒加 position encoding
# N - layer Endcoer
for layer in range(n_layer):
## ATTN ###
nor = NormalizeDiagonal(n_embd)(h)
a = MyConvld(nx, nx * 3, [batch_size, n_sequence])(nor)
a, present = ATTN(nx, n_head,n_sequence)(a)
a = MyConvld(nx, nx, [batch_size, n_sequence])(a)
##########
h = Add()([h, a])
###########
## MLP ##
nor = NormalizeDiagonal(n_embd)(h)
a = MyConvld(nx, nx * 4, [batch_size, n_sequence])(nor)
a = Activation("gelu")(a)
m = MyConvld(nx * 4, nx, [batch_size, n_sequence])(a)
###########
h = Add()([h, m])
###########
### output ###
h = NormalizeDiagonal(n_embd)(h)
### back to 0~1
h = Dense(n_sequence)(h)
## 只是想實驗看看 Transformer + Gru 會不會更厲害
h = GRU(256)(h)
h = Activation("sigmoid")(h)
h = Reshape((256,1))(h)
return Model(inputs, h)
完整的程式碼你可以在這邊找到
今天我們實作完了 Encoder 的架構,明天再來解決 Decoder 吧!