說到資料驗證可少不了單元測試 ( Unit test ):
project
底下可切分多個 dataset
來區隔測試案例。schema
代表一個新的 S3 bucket
,這代表你必須跟 cloud team
申請新的權限才能取得。那該怎麼辦呢?很簡單,只要在同一個 schema
底下切出不同的 table
就能達成和 BigQuery 一樣的效果了,對照的測試架構如下:
# Table structure for BQ
{project_name}.{dataset_name}.{table_name}
## Production table name for BQ
xxx-test.xxx_bronze.xxx_orders_log
## Unit test table name for BQ
xxx-test.xxx_bronze_20241227_WR9U8_TEST.xxx_orders_log
=================================================================================
# Table structure for Trino
{catelog_name}.{schema_name}.{table_name}
## Production table name for Trino
iceberg.xxx_bronze_production.xxx_orders_log
## Unit test table name for Trino
iceberg.xxx_bronze_staging.xxx_orders_log_20241227_WR9U8_TEST
解決了測試架構的問題,接下來就可以思考要怎麼塞資料了:
bq_load_data
,引用bigquery.Client
原生提供的 load_table_from_json
將資料導入測試用 BigQuery table。schema
所以只能在表的名稱用 Jinja
渲染 DDL 語法並帶入後綴,以此區分不同測試案例,再使用 INSERT INTO
將測資塞入。# BQ Create unit test dataset and table
## Use built-in function create_table to create table from ddl file
## Use built-in function delete_table to delete table after unit test is done
dataset = bq_create_dataset("xxx_bronze")
bq_table = bq_create_table(
dataset=dataset,
table_name="xxx_order_xxx_items_log",
schema=bq_client.schema_from_json(
"dags/dag_xxx_elt/schemas/bq/xxx_bronze/xxx_order_xxx_items_log.json"
),
)
# BQ Insert data into unit test table
## Use built-in function load_table_from_json to load json into unit test table
bq_load_data(
table_reference="xxx_bronze.xxx_order_xxx_items_log",
data=self.test_cases["data"]["xxx_order_xxx_items_log"],
)
# ===================================================================================
# Trino Create unit test table
## Create table from ddl file
## Use partial and jinja template to predefine table suffix
trino_create_table(
ddl_paths=[
"trino/xxx_bronze/xxx_orders_log.sql",
"trino/xxx_bronze/xxx_order_transactions_log.sql",
"trino/xxx_silver/orders_other.sql",
"trino/xxx_silver/orders_pos.sql",
"trino/xxx_silver/return_orders_other.sql",
"trino/xxx_gold/order_sales.sql",
],
)
# Trino Insert data into unit test table
## Combine INSERT INTO query after transforming data type from BQ to iceberg
## Combine DROP TABLE query while inserting test data
insert_configs = [
{
"schema_table_name": "xxx_bronze_staging.xxx_orders_log",
"test_data": self.test_cases["data"]["xxx_orders_log"],
},
{
"schema_table_name": "xxx_bronze_staging.mongo_shopline_order_transactions_log",
"test_data": self.test_cases["data"]["xxx_order_transactions_log"],
},
]
for insert_config in insert_configs:
trino_insert_many(
schema_table_name=insert_config["schema_table_name"],
test_data=insert_config["test_data"],
)
再來就是系列文前面有提到的 Data Type 問題,必須要解決舊版測資的資料格式問題,這時候我們使用一個比較暴力的方式解決 — CAST AS [typing]
,但轉換函數是照抄官方文檔的東西,比較穩妥。
# type transforming conditions
...
if isinstance(value, str) and trino_column_typing == TRINO_VARCHAR:
return value
elif isinstance(value, float) and trino_column_typing == TRINO_DOUBLE:
return value
elif isinstance(value, datetime) and trino_column_typing == TRINO_TIMESTAMP6_WITH_TIMEZONE:
return value
...
# NULL -> CAST(NULL AS typing)
...
if isinstance(python_data, str) and python_data in [
TRINO_VARCHAR,
TRINO_DOUBLE,
TRINO_BIGINT,
TRINO_BOOLEAN,
TRINO_TIMESTAMP6_WITH_TIMEZONE,
]:
return "CAST(NULL AS %s)" % python_data
if isinstance(python_data, str) and TRINO_ROW in python_data:
return "CAST(NULL AS %s)" % python_data
if isinstance(python_data, str) and TRINO_ARRAY in python_data:
return "CAST(NULL AS %s)" % python_data
...
寫捯這裡,鐵人賽也接近尾聲,說來也是有點感傷。
最後一部分《監控你的湖倉,Grafafa metrics 建立》將介紹如何透過 Grafana 來建立監控指標,並且設定告警,以做 Trino cluster 故障排除的依據。
My Linkedin: https://www.linkedin.com/in/benny0624/
My Medium: https://hndsmhsu.medium.com/