接續 Day 15 的 tf.Tramsform 介紹,今日進行實作,先以TensorFlow Transform 預處理數據的入門範例 作為演示過程,官方 Colab 支援 , 您會發現 Apache Beam 的 pipeline 撰寫方式融入在程式中。
try:
import colab
!pip install --upgrade pip
except:
pass
!pip install -q -U tensorflow_transform==0.24.1
pprint
輸出比較漂亮。tempfile
生成暫存檔案所需。tensorflow
已經是2.x版需引入。tensorflow_transform
簡稱 tft
。tensorflow_transform.beam
實現使用 Apache Beam 。tensorflow_transform.tf_metadata
為紀錄數據所需要的 metadata 模組,引入 dataset_metadata
及 schema_utils
。import pprint
import tempfile
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import schema_utils
raw_data = [
{'x': 1, 'y': 1, 's': 'hello'},
{'x': 2, 'y': 2, 's': 'world'},
{'x': 3, 'y': 3, 's': 'hello'}
]
raw_data_metadata = dataset_metadata.DatasetMetadata(
schema_utils.schema_from_feature_spec({
'y': tf.io.FixedLenFeature([], tf.float32),
'x': tf.io.FixedLenFeature([], tf.float32),
's': tf.io.FixedLenFeature([], tf.string),
}))
def preprocessing_fn(inputs):
"""Preprocess input columns into transformed columns."""
x = inputs['x']
y = inputs['y']
s = inputs['s']
x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)
return {
'x_centered': x_centered,
'y_normalized': y_normalized,
's_integerized': s_integerized,
'x_centered_times_y_normalized': x_centered_times_y_normalized,
}
現在我們已準備好轉換我們的數據。我們將使用帶有直接運行器的 Apache Beam,輸入為:
raw_data
: 我們上面創建的原始輸入數據。raw_data_metadata
: 原始數據的 Schema。preprocessing_fn
: 預處理的特徵工程函數。關於 Apache Beam 的特殊語法用到了會在 Linux 使用的 |
(pipe 運算子),可以理解為:
=
左邊是最終結果 result。=
右邊第一個是輸入 pass_this,接續 |
是執行過程步驟。|
右邊先命名
,再>>
執行。也可以省略命名直接 to_this_call。result = pass_this | 'name this step' >> to_this_call
result = apache_beam.Pipeline() | 'first step' >> do_this_first() | 'second step' >> do_this_last()
在此範例,創建了暫時資料夾,將raw_data, raw_data_metadata
作為 tft_beam.AnalyzeAndTransformDataset( preprocessing_fn)
的輸入,執行結果輸出存入 transformed_dataset, transform_fn
。
最後將 transformed_dataset
拆成 transformed_data
, transformed_metadata
,並列印原始資料以及經過前處理的資料對照。
def main():
# Ignore the warnings
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
transformed_dataset, transform_fn = (
(raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))
if __name__ == '__main__':
main()