iT邦幫忙

2025 iThome 鐵人賽

DAY 28
1
AI & Data

動不動就要 ETL? 以Trino為例-淺談從資料倉儲到湖倉系列 第 28

Day 28 - Trino + Iceberg ELT實作(四)

  • 分享至 

  • xImage
  •  

28

資料驗證

說到資料驗證可少不了單元測試 ( Unit test ):

  1. 在 BigQuery 上實作容易,因 BigQuery 同一個 project 底下可切分多個 dataset 來區隔測試案例。
  2. 在 Trino 上就沒那麼簡單了,因為一個新的 schema 代表一個新的 S3 bucket,這代表你必須跟 cloud team 申請新的權限才能取得。

那該怎麼辦呢?很簡單,只要在同一個 schema 底下切出不同的 table 就能達成和 BigQuery 一樣的效果了,對照的測試架構如下:

# Table structure for BQ
{project_name}.{dataset_name}.{table_name}

## Production table name for BQ
xxx-test.xxx_bronze.xxx_orders_log

## Unit test table name for BQ
xxx-test.xxx_bronze_20241227_WR9U8_TEST.xxx_orders_log

=================================================================================

# Table structure for Trino
{catelog_name}.{schema_name}.{table_name}

## Production table name for Trino
iceberg.xxx_bronze_production.xxx_orders_log

## Unit test table name for Trino
iceberg.xxx_bronze_staging.xxx_orders_log_20241227_WR9U8_TEST

解決了測試架構的問題,接下來就可以思考要怎麼塞資料了:

  1. 在 BigQuery 上,我們可以使用自定義的函數 bq_load_data ,引用bigquery.Client 原生提供的 load_table_from_json 將資料導入測試用 BigQuery table。
  2. Trino 的部分,因為沒辦法自己建立新的 schema 所以只能在表的名稱用 Jinja 渲染 DDL 語法並帶入後綴,以此區分不同測試案例,再使用 INSERT INTO 將測資塞入。
# BQ Create unit test dataset and table
## Use built-in function create_table to create table from ddl file
## Use built-in function delete_table to delete table after unit test is done

dataset = bq_create_dataset("xxx_bronze")
bq_table = bq_create_table(
    dataset=dataset,
    table_name="xxx_order_xxx_items_log",
    schema=bq_client.schema_from_json(
        "dags/dag_xxx_elt/schemas/bq/xxx_bronze/xxx_order_xxx_items_log.json"
    ),
)
        
# BQ Insert data into unit test table
## Use built-in function load_table_from_json to load json into unit test table

bq_load_data(
    table_reference="xxx_bronze.xxx_order_xxx_items_log",
    data=self.test_cases["data"]["xxx_order_xxx_items_log"],
)
# ===================================================================================

# Trino Create unit test table 
## Create table from ddl file 
## Use partial and jinja template to predefine table suffix 

trino_create_table(
    ddl_paths=[
        "trino/xxx_bronze/xxx_orders_log.sql",
        "trino/xxx_bronze/xxx_order_transactions_log.sql",
        "trino/xxx_silver/orders_other.sql",
        "trino/xxx_silver/orders_pos.sql",
        "trino/xxx_silver/return_orders_other.sql",
        "trino/xxx_gold/order_sales.sql",
    ],
)

# Trino Insert data into unit test table
## Combine INSERT INTO query after transforming data type from BQ to iceberg
## Combine DROP TABLE query while inserting test data

insert_configs = [
    {
        "schema_table_name": "xxx_bronze_staging.xxx_orders_log",
        "test_data": self.test_cases["data"]["xxx_orders_log"],
    },
    {
        "schema_table_name": "xxx_bronze_staging.mongo_shopline_order_transactions_log",
        "test_data": self.test_cases["data"]["xxx_order_transactions_log"],
    },
]

for insert_config in insert_configs:
    trino_insert_many(
        schema_table_name=insert_config["schema_table_name"],
        test_data=insert_config["test_data"],
    )

再來就是系列文前面有提到的 Data Type 問題,必須要解決舊版測資的資料格式問題,這時候我們使用一個比較暴力的方式解決 — CAST AS [typing],但轉換函數是照抄官方文檔的東西,比較穩妥。

# type transforming conditions
...
if isinstance(value, str) and trino_column_typing == TRINO_VARCHAR:
    return value
elif isinstance(value, float) and trino_column_typing == TRINO_DOUBLE:
    return value
elif isinstance(value, datetime) and trino_column_typing == TRINO_TIMESTAMP6_WITH_TIMEZONE:
    return value
...

# NULL -> CAST(NULL AS typing)
...
if isinstance(python_data, str) and python_data in [
    TRINO_VARCHAR,
    TRINO_DOUBLE,
    TRINO_BIGINT,
    TRINO_BOOLEAN,
    TRINO_TIMESTAMP6_WITH_TIMEZONE,
]:
    return "CAST(NULL AS %s)" % python_data
if isinstance(python_data, str) and TRINO_ROW in python_data:
    return "CAST(NULL AS %s)" % python_data
if isinstance(python_data, str) and TRINO_ARRAY in python_data:
    return "CAST(NULL AS %s)" % python_data
...

明日預告

寫捯這裡,鐵人賽也接近尾聲,說來也是有點感傷。

最後一部分《監控你的湖倉,Grafafa metrics 建立》將介紹如何透過 Grafana 來建立監控指標,並且設定告警,以做 Trino cluster 故障排除的依據。

Know me more

My Linkedin: https://www.linkedin.com/in/benny0624/
My Medium: https://hndsmhsu.medium.com/


上一篇
Day 27 - Trino + Iceberg ELT實作(三)
系列文
動不動就要 ETL? 以Trino為例-淺談從資料倉儲到湖倉28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言