iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 23
0
AI & Data

深度學習裡的冰與火之歌 : Tensorflow vs PyTorch系列 第 23

Day 23: Tensorflow 2.0: 再造訪 Distribute Strategy API

  • 分享至 

  • xImage
  •  

Distribute Strategy API 提供使用者不同的策略去 deploy 計算圖到不同的 CPUs,GPUs 或 TPUs 上。這個 API 整合了 Tensorflow 1.x 的函式,並在 Tensorflow 2.0 不止支援 graph mode,更支援 eager mode。更具體的說明 Distribute Strategy API 所涵蓋的使用者情境包括了:

  1. 資料平行話: API 可以應用在同步化訓練,或非同步訓練。同步化訓練,將會採用多個執行程序,載入部分非重疊的訓練資料做訓練,最後再用 all-reduce 方法算出統計值,如 sum 或 mean。
  2. 異質化運算平台的資源分配

這個 API 總共支援六個 strategies,他們是:

  1. MirroredStrategy:這個策略適用於一個機器上有多個不同的 device,如多個 CPU 和 GPU等,並且在每個 GPU 提供 一個 replica。策略中會將變數鏡像拷貝到 replica 上,以確保每一個變數都可以在不同的 device 讀到,並以一個統一的變數來讀取,那就是 MirroredVariable。要建立一個 MirroredStrategy 的物件則需呼叫 distribute API 的 mirrored_strategy = tf.distribute.MirroredStrategy() 物件。在此物件中,有幾個參數特別重要,這些是:
    1. devices:這個引數要傳入一個 python list,裡面的元素為字串,每一個字串寫明了需要用到的 device 名稱,這個引數允許使用者使用在運行機器上設備的子集,而非使用所有設備。
    2. cross_device_ops:在 all-reduce 的運算元中預設使用 NVIDIA NCCL 來做跨 device 的溝通,或是給定 tf.distribute.NcclAllReduce() 。該選項可以複寫。複寫的選項包括:tf.distribute.HierarchicalCopyAllReduce() 和`tf.distribute.ReductionToOneDevice()`。`tf.distribute.ReductionToOneDevice()` 會先在一個 device 上做 reduce,再廣播給其他 device。tf.distribute.HierarchicalCopyAllReduce()則是依照 device 的拓樸形狀來做 reduce。這些跨裝備的 reduce strategy 都是繼承 tf.distribute.CrossDeviceOps(),該類別需要傳入一個 reduce 運算元,如加法或平均。
  2. CentralStorageStrategy:這個 strategy 並沒有鏡像拷貝,而是將所有的變數都集中在一個 CPU 上,而所有的運算資訊在分散在不同的 GPU 上。要使用這個 strategy,要建立 tf.distribute.experimental.CentralStorageStrategy() 物件。
  3. MultiWorkerMirroredStrategy:這個 strategy 和 MirroredStrategy 非常相似,都會在不同 device 上建立 replica,但不同的是 MultiWorkerMirroredStrategy 使用 collective_ops 根據不同的網路架構,硬體結構和 Tensor 的大小,從不同裝置中蒐集工作結果並進行 all-reduce。 collective_ops 包括了 broadcast 和 all-gather,並包括運算元表現的最加化。若要使用此 strategy 可以建立 CentralStorageStrategy 物件,藉著呼叫 tf.distribute.experimental.MultiWorkerMirroredStrategy()。這個函式有一個引數,可以讓使用者決定要用哪一個 collective_ops 實踐方法。可以使用的實踐方法包括了:CollectiveCommunication.RING 和 CollectiveCommunication.NCCL。前者是實踐由 gRPC 作為通訊 protocol,而 cluster 的拓墣構造為ring,後者則是 NVIDIA NCCL 的實踐方法。
  4. TPUStrategy:這個 strategy 和 MirroredStrategy 非常相似,但是執行在 TPU 上且 all-reduce 的collective_op 運算元則使用 TPU 的實踐方式。目前 TPU 只能在 google colab 和 google cloud 中使用。若要使用 TPU 必須在一開始就指定 TPU 並且初始化 TPU 的運算系統,在 TPU 初始化後,將會清空記憶體,而造成資料遺失的可能。
  5. ParameterServerStrategy:這個 strategy 會建立一到多個 Parameter Servers,每一個 Parameter Servers 會擁有自己的 workers,每一個 variable 會被分配到一個 Parameter Server,再由 server 分配工作給 workers。 若要使用此 strategy,則需要呼叫 tf.distribute.experimental.ParameterServerStrategy() 建立物件。
    6. OneDeviceStrategy:誠如其名,這個 strategy 則是建立在單一設備上。在這個 strategy 中,會進行 prefetch 資料到唯一的設備上,很適合做原始碼測試。若要使用該 strategy 則需要呼叫 tf.distribute.OneDeviceStrategy(device="/gpu:0") 。呼叫可給定設備名稱,若有多設備於機器上。

上面所列舉的 strategy ,只有 MirroredStrategy 和 OneDeviceStrategy 不是實驗性的,其他皆在實驗性階段。除此之外,在 distribute API 的官方使用手冊附有一個表格說明以上的 strategy 對 keras API,客製化的 training loops(custom loops) 和 Estimator API 支援的狀態。

使用在 Keras 的模型

tf.keras 有最廣的 tf.distribute.Strategy 支援,包括了 Sequential API,Functional API 和 tf.keras.Model 的子類別。使用者若要應用現有的 keras 模型到 distribute training,則需要:

  1. 建立一個 tf.distribute.Strategy 物件
  2. 移動 tf.keras 模型到 tf.distribute.Strategy 物件的 scope

下面就是程式碼範例,使用 MirroredStrategy 來做 distribution training,資料為 MNIST。文件參考為Distributed training with Keras

# ... 省略資料下載和處理的原始碼

strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) # colab, GPU enable
#=> Number of devices: 1

BUFFER_SIZE = 10000 #用在 shuffle 上

BATCH_SIZE_PER_REPLICA = 64
# 可使用 trategy.num_replicas_in_sync 來得到目前 replica 的數目
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
print(BATCH_SIZE) # 只有一個 replica
#=> 64 
# scale 是一個 function,將影像的數值調整為 0 到 1 之殲
# 做 batch 轉換時,需要傳入所有 replica 合計的批次大小而非每一個 replica 持有的批次大小
train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

# 調整 learnig rate 根據 Batch size。若愈多的 replica 則所有的 BATCH_SIZE 會較大,需要提高 learning rate
LEARNING_RATES_BY_BATCH_SIZE = {BATCH_SIZE_PER_REPLICA: 0.1, 2*BATCH_SIZE_PER_REPLICA: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[BATCH_SIZE]

with strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  model.compile(loss='sparse_categorical_crossentropy',
                optimizer=tf.keras.optimizers.Adam(learning_rate),
                metrics=['accuracy'])
#=>INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

我們可以看到藉著呼叫 Strategy 物件的 scope 方法,我們可以將有需要做 distribute training 的原始碼納入 Distribution Strategy 物件的 scope context,如程式碼。將建制模型的原始碼納入 Strategy 物件的 scope,會允許 Strategy 物件 access 所有的變數,因而可對所有變數建立 replica 而非只有常用的變數。 納入模型的 compiling 原始碼,則明白告訴 Strategy 物件,請使用該 Strategy 物件訓練模型。

使用在客製化的 training loops

客製化的 training loop 通常發生在訓練時必須引入特別的訓練過程,如 GAN,需要 generator 生成的例子交付給 discriminator 去判斷真偽,這時我們就可以就 generator 和 discriminator 的狀態來做不同次數的訓練。
若要使用 tf.distribute.Strategy 在客製化的 training loop,首先如在 tf.keras.Model 使用 Strategy 物件,我們將需要做 distribute training 的物件,寫在 Strategy 物件的 scope 的 context 內,只不過這次我們也把 optimizer 也放入。

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

接著我們要建立 distribute dataset,透過呼叫experimental_distribute_dataset,並傳入一般的 tf.data.Dataset 而成。

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(BATCH_SIZE)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然後,我們用 tf.function 包覆客製化的 training loop。在客製化的 training loop 中,我們還需要定義 step_fn,並將step_fn傳入 tf.distrbute.Strategy.experimental_run_v2。該方法另外一個引數則是我們剛剛建好的 distribute dataset。所以,客製化的 training loop 的原始碼如下:

@tf.function
def train_step(dist_inputs):
  def step_fn(inputs):
    # 傳回每一次疊代進行正向傳播計算後損失值
    features, labels = inputs

    with tf.GradientTape() as tape:
      logits = model(features)
      cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
          logits=logits, labels=labels)
      # 用全批次大小來 scale 每一個 replica 傳回的 loss,而非 replica 的批次大小
      loss = tf.reduce_sum(cross_entropy) * (1.0 / BATCH_SIZE)

    grads = tape.gradient(loss, model.trainable_variables) #計算梯度
    # 修改 apply_gradients 的行為
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables))) #更新參數
    return cross_entropy
  
  # experimental_run_v2 會傳回每一個 replica 的計算結果
  per_example_losses = mirrored_strategy.experimental_run_v2(
      step_fn, args=(dist_inputs,))
  # experimental_run_v2 收集每個 replica 的計算結果於 per_example_losses,呼叫 reduce 並傳入   # per_example_losses 做計算
  mean_loss = mirrored_strategy.reduce(
      tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0) #all-reduce 運算元
  return mean_loss

在程式碼中,我們可以看到 step_fn利用 tf.GradientTape() 的 context 定義了正向傳播的計算邏輯,計算了梯度,以及更新了參數。為了能夠應用在分散式計算,optimizer.apply_gradients 的行為必須要改變,也就是在蒐集同步平行的計算中,回傳的損失值,apply_gradients這個函式會先對所有的 replica 的梯度計算結果做總和(sum-over-all-replicas)。
而在 train_step中我們則看到了experimental_run_v2負責執行而 Strategy 物件則對所有的 replicas 做計算。接著我們可以撰寫一個 loop 來執行:

with mirrored_strategy.scope():
  for inputs in dist_dataset:
    print(train_step(inputs))

最後建立一個 distribute dataset 不一定要用experimental_distribute_dataset來建制,也可以用 tf.distribute.Strategy.make_experimental_numpy_dataset 如果你的 raw input 是 numpy ndarray 物件。

使用在 Estimator

若想使用 tf.distribute.Streategy 的 API 於 tf.Estimator ,比起 tf.keras.Model,可說是更為簡單。那就是我們可以建立一個 Strategy 物件,並在生成 tf.EstimatorRunConfig 物件時,藉由參數指定傳入 tf.Estimator 即可。程式碼如下:

# 先建立 Streagy 物件
mirrored_strategy = tf.distribute.MirroredStrategy()
# 建立 RunConfig 物件,我們可以將 train_distribute 和 eval_distribute 都指定為同一個 Strategy 
# 物件,若兩者使用同一種 Strategy 
config = tf.estimator.RunConfig(
    train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)
regressor = tf.estimator.LinearRegressor(
    feature_columns=[tf.feature_column.numeric_column('feats')],
    optimizer='SGD',
    config=config)

但因為在 tf.Estimator,除了能指定不同的 strategies 給 training 和 evaluating 外,使用者有相當大的自由度去決定資料如何分割和複製,而這個自由度則是靠著撰寫 input_fn達成。input_fn 需要回傳 PER_REPLICA_BATCH_SIZE 筆資料,若有 N 個 replica 則共可傳回 N*PER_REPLICA_BATCH_SIZE 的資料。

佛心

除了以上的 tf.distribute 方法,另外還有一個方法可以獲得精度較小的控制,那就是提供 cluster config 以 json 格式記錄為檔案。只要將該檔案指派給環境變數 TF_CONFIG,就會依檔案來分派計算工作。除此之外,Tensorflow 還佛心來著,提供 Kubernetes template,並將 TF_CONFIG 設好,供大家享用。


上一篇
Day 22: Tensorflow 2.0: 再造訪 `tf.estimator` 和 `tf.data`
下一篇
Day 24 插播 PyTorch 的 Distributed Training
系列文
深度學習裡的冰與火之歌 : Tensorflow vs PyTorch31
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言