接續 Day 15 的 tf.Tramsform 介紹,今日進行實作,先以TensorFlow Transform 預處理數據的入門範例 作為演示過程,官方 Colab 支援 
, 您會發現 Apache Beam 的 pipeline 撰寫方式融入在程式中。
try:
  import colab
  !pip install --upgrade pip
except:
  pass
!pip install -q -U tensorflow_transform==0.24.1
pprint 輸出比較漂亮。tempfile 生成暫存檔案所需。tensorflow 已經是2.x版需引入。tensorflow_transform 簡稱 tft 。tensorflow_transform.beam 實現使用 Apache Beam 。tensorflow_transform.tf_metadata 為紀錄數據所需要的 metadata 模組,引入 dataset_metadata 及 schema_utils。import pprint
import tempfile
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
raw_data = [
      {'x': 1, 'y': 1, 's': 'hello'},
      {'x': 2, 'y': 2, 's': 'world'},
      {'x': 3, 'y': 3, 's': 'hello'}
  ]
raw_data_metadata = dataset_metadata.DatasetMetadata(
    schema_utils.schema_from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
    }
現在我們已準備好轉換我們的數據。我們將使用帶有直接運行器的 Apache Beam,輸入為:
raw_data : 我們上面創建的原始輸入數據。raw_data_metadata : 原始數據的 Schema。preprocessing_fn : 預處理的特徵工程函數。關於 Apache Beam 的特殊語法用到了會在 Linux 使用的 | (pipe 運算子),可以理解為:
= 左邊是最終結果 result。= 右邊第一個是輸入 pass_this,接續 | 是執行過程步驟。| 右邊先命名,再>>執行。也可以省略命名直接 to_this_call。result = pass_this | 'name this step' >> to_this_call
result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()
在此範例,創建了暫時資料夾,將raw_data, raw_data_metadata作為 tft_beam.AnalyzeAndTransformDataset( preprocessing_fn) 的輸入,執行結果輸出存入 transformed_dataset, transform_fn 。
最後將 transformed_dataset 拆成 transformed_data ,  transformed_metadata ,並列印原始資料以及經過前處理的資料對照。
def main():
  # Ignore the warnings
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (  
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))
  transformed_data, transformed_metadata = transformed_dataset  
  print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
  print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
if __name__ == '__main__':
  main()
