iT邦幫忙

2025 iThome 鐵人賽

DAY 26
1

26

ETL 搬遷

對專案背景做完總結後,再來便可以進入 Trino ELT 實作的重點 — ELT 排程搬遷,本節將針對 BigQuery vs. Trino 在 DDL 語法、Data type 以及 Query function 上的差異:

  1. DDL 語法:

下面以訂單相關的 CREATE TABLE 語法做舉例,比較 BigQuery 與 Trino 在下 DDL 語法之間的差異:

-- BQ script structure
CREATE TABLE IF NOT EXISTS `data_silver.mongo_shopline_order_items`
(
    order_item_id STRING NOT NULL OPTIONS(description="Order item ID"),
    order_id STRING OPTIONS(description="Refers to orders._id in Mongo"),
    ...
    created_at TIMESTAMP OPTIONS(description="Time of order_item created at in Mongo"),
    updated_at TIMESTAMP OPTIONS(description="Time of order_item updated at in Mongo"),
    _processed_at TIMESTAMP NOT NULL OPTIONS(description="Airflow data_interval_start")
)
PARTITION BY DATE(created_at)
CLUSTER BY order_id, updated_at;

-- Trino script structure
CREATE TABLE IF NOT EXISTS iceberg.datalakehouse_elt_silver.mongo_shopline_order_items (
    order_item_id VARCHAR NOT NULL COMMENT 'Order item ID',
    order_id VARCHAR COMMENT 'Refers to orders._id in Mongo',
    ...
    created_at TIMESTAMP WITH TIME ZONE COMMENT 'Time of order_item created at in Mongo',
    updated_at TIMESTAMP WITH TIME ZONE COMMENT 'Time of order_item updated at in Mongo',
    _processed_at TIMESTAMP WITH TIME ZONE NOT NULL COMMENT 'Airflow data_interval_start'
)
WITH (
    partitioning = ARRAY['_processed_at']
);

可以看出兩者語法上的差異,除了語法之外還額外補充 BigQuery 與 Trino 在 Clustering 與 Sorting 機制運作的差異:

比較值得注意的是,Trino Bloom filter 對於精準比對(=, IN) 操作很有效,但對於範圍查詢(>, <, BETWEEN)就比較沒輒了。

  1. Data type

再來則是 BigQuery 與 Trino 在描述 Data type 的差異,基本上都有能夠互相對照的 Data type,轉換上比較放心,不怕找不到相應的資料型態:

  1. Query function

最後,來說下 查詢引擎 轉換上最值得注意的部分 — Query function,很容易踩坑就是因為可能兩邊函數實作根本不同無法做直接對照轉換:

舉例來說,讓筆者踩坑最嚴重的便屬 JSON function ,BigQuery 基本上能對 JSON function 做簡單調用,而 Trino 還需在 JSON function 中對 Json path 的模式做另外的設定:

  • strict mode : 在 嚴格模式 下,輸入的 JSON 資料必須 嚴格符合 路徑表達式所要求的結構,如下面例子,若 JSON 資料沒有包含需被解析的物件 (這邊是 address) 就會報錯。
-- JSON document
WITH data AS (
    SELECT JSON '{"name": "Alice", "age": 25}' AS json_data
)
SELECT 
    JSON_VALUE(json_data, 'strict $.name') AS name, -- Works fine
    JSON_VALUE(json_data, 'strict $.address') AS address -- Throws an error because 'address' does not exist
FROM data;
  • lax mode : 相反來說,在 寬鬆模式 下,JSON 資料不一定要完全符合路徑表達式所要求的結構,如下面例子,若 JSON 資料沒包含需被解析的物件 (一樣是 address),程式會返回 NULL 值。
-- JSON document
WITH data AS (
    SELECT JSON '{"name": "Alice", "age": 25}' AS json_data
)
SELECT 
    JSON_VALUE(json_data, 'lax $.name') AS name, -- Works fine
    JSON_VALUE(json_data, 'lax $.address') AS address -- Returns NULL because 'address' does not exist
FROM data;

另外,BigQuery 與 Trino 在使用 JSON function 去建立巢狀的陣列物件 ARRAY[VARCHAR] 的做法差異也很大:

-- BQ
ARRAY(SELECT TRIM(item, '"') FROM UNNEST(JSON_QUERY_ARRAY(data, "$.item_ids")) AS item) AS item_ids

-- Trino
TRY_CAST(JSON_EXTRACT(json_data, '$.item_ids') AS ARRAY(VARCHAR)) AS item_ids

最後,Iceberg 本身不支援 JSON 欄位型別,所以如果原始資料表的欄位是 JSONARRAY[JSON],就必須在寫入 Iceberg 前轉換成:

  • VARCHAR ( JSON 字串)
  • ARRAY<VARCHAR> (每個元素都為 JSON 字串)
-- 可觀察下面 payments_formatted 欄位
-- 必須使用 ARRAY_AGG 轉換為 ARRAY[VARCHAR] 才能寫入 Iceberg
-- 等於比 BigQuery 多了一個 GroupBy 的步驟
formatted_payment AS (
    SELECT
        JSON_VALUE(json_data, 'strict $._id') AS order_transaction_id,
        TRY_CAST(JSON_EXTRACT(json_data, '$.payments') AS ARRAY(JSON)) AS payments_raw,
        -- Using UNNEST to extract individual elements from the payments array
        ARRAY_AGG(JSON_FORMAT(payment)) AS payments_formatted -- Aggregate the formatted payments into an ARRAY[VARCHAR]
    FROM
        json_data AS t
    CROSS JOIN UNNEST(
        TRY_CAST(JSON_EXTRACT(json_data, '$.payments') AS ARRAY(JSON))
    ) AS t(payment) -- Unnesting the payments array
    GROUP BY
        JSON_VALUE(json_data, 'strict $._id'),
        TRY_CAST(JSON_EXTRACT(json_data, '$.payments') AS ARRAY(JSON))
),
parsed AS (
    SELECT
        JSON_VALUE(json_data, 'strict $._id') AS order_transaction_id,
        ...
        s.payments_formatted AS payments,
        ...
        TRY_CAST(_processed_at AS TIMESTAMP(6) WITH TIME ZONE) AS _processed_at
    FROM
        json_data AS t
    LEFT JOIN
        formatted_payment AS s
    ON
        JSON_VALUE(json_data, 'strict $._id') = s.order_transaction_id
        AND TRY_CAST(JSON_EXTRACT(json_data, '$.payments') AS ARRAY(JSON)) = s.payments_raw
)

明日預告

明日《Trino + Iceberg ELT 實作 (三)》將針對增量更新、聚合計算這些 ELT 常用到的場景做搬遷實作的說明,相信對於實作派的讀者會更有感覺。

Know me more

My Linkedin: https://www.linkedin.com/in/benny0624/
My Medium: https://hndsmhsu.medium.com/


上一篇
Day 25 - Trino + Iceberg ELT實作(一)
下一篇
Day 27 - Trino + Iceberg ELT實作(三)
系列文
動不動就要 ETL? 以Trino為例-淺談從資料倉儲到湖倉27
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言