Distribute Strategy API 提供使用者不同的策略去 deploy 計算圖到不同的 CPUs,GPUs 或 TPUs 上。這個 API 整合了 Tensorflow 1.x 的函式,並在 Tensorflow 2.0 不止支援 graph mode,更支援 eager mode。更具體的說明 Distribute Strategy API 所涵蓋的使用者情境包括了:
這個 API 總共支援六個 strategies,他們是:
MirroredVariable
。要建立一個 MirroredStrategy
的物件則需呼叫 distribute API 的 mirrored_strategy = tf.distribute.MirroredStrategy()
物件。在此物件中,有幾個參數特別重要,這些是:
tf.distribute.NcclAllReduce()
。該選項可以複寫。複寫的選項包括:tf.distribute.HierarchicalCopyAllReduce()
和`tf.distribute.ReductionToOneDevice()`。`tf.distribute.ReductionToOneDevice()` 會先在一個 device 上做 reduce,再廣播給其他 device。tf.distribute.HierarchicalCopyAllReduce()
則是依照 device 的拓樸形狀來做 reduce。這些跨裝備的 reduce strategy 都是繼承 tf.distribute.CrossDeviceOps()
,該類別需要傳入一個 reduce 運算元,如加法或平均。tf.distribute.experimental.CentralStorageStrategy()
物件。tf.distribute.experimental.MultiWorkerMirroredStrategy()
。這個函式有一個引數,可以讓使用者決定要用哪一個 collective_ops 實踐方法。可以使用的實踐方法包括了:CollectiveCommunication.RING 和 CollectiveCommunication.NCCL。前者是實踐由 gRPC 作為通訊 protocol,而 cluster 的拓墣構造為ring,後者則是 NVIDIA NCCL 的實踐方法。 tf.distribute.experimental.ParameterServerStrategy()
建立物件。tf.distribute.OneDeviceStrategy(device="/gpu:0")
。呼叫可給定設備名稱,若有多設備於機器上。上面所列舉的 strategy ,只有 MirroredStrategy 和 OneDeviceStrategy 不是實驗性的,其他皆在實驗性階段。除此之外,在 distribute API 的官方使用手冊附有一個表格說明以上的 strategy 對 keras API,客製化的 training loops(custom loops) 和 Estimator API 支援的狀態。
tf.keras
有最廣的 tf.distribute.Strategy
支援,包括了 Sequential API,Functional API 和 tf.keras.Model
的子類別。使用者若要應用現有的 keras 模型到 distribute training,則需要:
tf.distribute.Strategy
物件tf.keras
模型到 tf.distribute.Strategy
物件的 scope下面就是程式碼範例,使用 MirroredStrategy 來做 distribution training,資料為 MNIST。文件參考為Distributed training with Keras
# ... 省略資料下載和處理的原始碼
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) # colab, GPU enable
#=> Number of devices: 1
BUFFER_SIZE = 10000 #用在 shuffle 上
BATCH_SIZE_PER_REPLICA = 64
# 可使用 trategy.num_replicas_in_sync 來得到目前 replica 的數目
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
print(BATCH_SIZE) # 只有一個 replica
#=> 64
# scale 是一個 function,將影像的數值調整為 0 到 1 之殲
# 做 batch 轉換時,需要傳入所有 replica 合計的批次大小而非每一個 replica 持有的批次大小
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)
# 調整 learnig rate 根據 Batch size。若愈多的 replica 則所有的 BATCH_SIZE 會較大,需要提高 learning rate
LEARNING_RATES_BY_BATCH_SIZE = {BATCH_SIZE_PER_REPLICA: 0.1, 2*BATCH_SIZE_PER_REPLICA: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[BATCH_SIZE]
with strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(learning_rate),
metrics=['accuracy'])
#=>INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
我們可以看到藉著呼叫 Strategy 物件的 scope
方法,我們可以將有需要做 distribute training 的原始碼納入 Distribution Strategy 物件的 scope
context,如程式碼。將建制模型的原始碼納入 Strategy 物件的 scope
,會允許 Strategy 物件 access 所有的變數,因而可對所有變數建立 replica 而非只有常用的變數。 納入模型的 compiling 原始碼,則明白告訴 Strategy 物件,請使用該 Strategy 物件訓練模型。
客製化的 training loop 通常發生在訓練時必須引入特別的訓練過程,如 GAN,需要 generator 生成的例子交付給 discriminator 去判斷真偽,這時我們就可以就 generator 和 discriminator 的狀態來做不同次數的訓練。
若要使用 tf.distribute.Strategy
在客製化的 training loop,首先如在 tf.keras.Model
使用 Strategy 物件,我們將需要做 distribute training 的物件,寫在 Strategy 物件的 scope 的 context 內,只不過這次我們也把 optimizer 也放入。
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()
接著我們要建立 distribute dataset,透過呼叫experimental_distribute_dataset
,並傳入一般的 tf.data.Dataset
而成。
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(BATCH_SIZE)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
然後,我們用 tf.function
包覆客製化的 training loop。在客製化的 training loop 中,我們還需要定義 step_fn
,並將step_fn
傳入 tf.distrbute.Strategy.experimental_run_v2
。該方法另外一個引數則是我們剛剛建好的 distribute dataset。所以,客製化的 training loop 的原始碼如下:
@tf.function
def train_step(dist_inputs):
def step_fn(inputs):
# 傳回每一次疊代進行正向傳播計算後損失值
features, labels = inputs
with tf.GradientTape() as tape:
logits = model(features)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
logits=logits, labels=labels)
# 用全批次大小來 scale 每一個 replica 傳回的 loss,而非 replica 的批次大小
loss = tf.reduce_sum(cross_entropy) * (1.0 / BATCH_SIZE)
grads = tape.gradient(loss, model.trainable_variables) #計算梯度
# 修改 apply_gradients 的行為
optimizer.apply_gradients(list(zip(grads, model.trainable_variables))) #更新參數
return cross_entropy
# experimental_run_v2 會傳回每一個 replica 的計算結果
per_example_losses = mirrored_strategy.experimental_run_v2(
step_fn, args=(dist_inputs,))
# experimental_run_v2 收集每個 replica 的計算結果於 per_example_losses,呼叫 reduce 並傳入 # per_example_losses 做計算
mean_loss = mirrored_strategy.reduce(
tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0) #all-reduce 運算元
return mean_loss
在程式碼中,我們可以看到 step_fn
利用 tf.GradientTape()
的 context 定義了正向傳播的計算邏輯,計算了梯度,以及更新了參數。為了能夠應用在分散式計算,optimizer.apply_gradients
的行為必須要改變,也就是在蒐集同步平行的計算中,回傳的損失值,apply_gradients
這個函式會先對所有的 replica 的梯度計算結果做總和(sum-over-all-replicas)。
而在 train_step
中我們則看到了experimental_run_v2
負責執行而 Strategy 物件則對所有的 replicas 做計算。接著我們可以撰寫一個 loop 來執行:
with mirrored_strategy.scope():
for inputs in dist_dataset:
print(train_step(inputs))
最後建立一個 distribute dataset 不一定要用experimental_distribute_dataset
來建制,也可以用 tf.distribute.Strategy.make_experimental_numpy_dataset
如果你的 raw input 是 numpy ndarray 物件。
若想使用 tf.distribute.Streategy
的 API 於 tf.Estimator
,比起 tf.keras.Model
,可說是更為簡單。那就是我們可以建立一個 Strategy 物件,並在生成 tf.Estimator
的 RunConfig
物件時,藉由參數指定傳入 tf.Estimator
即可。程式碼如下:
# 先建立 Streagy 物件
mirrored_strategy = tf.distribute.MirroredStrategy()
# 建立 RunConfig 物件,我們可以將 train_distribute 和 eval_distribute 都指定為同一個 Strategy
# 物件,若兩者使用同一種 Strategy
config = tf.estimator.RunConfig(
train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
feature_columns=[tf.feature_column.numeric_column('feats')],
optimizer='SGD',
config=config)
但因為在 tf.Estimator
,除了能指定不同的 strategies 給 training 和 evaluating 外,使用者有相當大的自由度去決定資料如何分割和複製,而這個自由度則是靠著撰寫 input_fn
達成。input_fn
需要回傳 PER_REPLICA_BATCH_SIZE 筆資料,若有 N 個 replica 則共可傳回 N*PER_REPLICA_BATCH_SIZE
的資料。
除了以上的 tf.distribute
方法,另外還有一個方法可以獲得精度較小的控制,那就是提供 cluster config 以 json 格式記錄為檔案。只要將該檔案指派給環境變數 TF_CONFIG,就會依檔案來分派計算工作。除此之外,Tensorflow 還佛心來著,提供 Kubernetes template,並將 TF_CONFIG 設好,供大家享用。