iT邦幫忙

2025 iThome 鐵人賽

DAY 27
1
AI & Data

動不動就要 ETL? 以Trino為例-淺談從資料倉儲到湖倉系列 第 27

Day 27 - Trino + Iceberg ELT實作(三)

  • 分享至 

  • xImage
  •  

27

增量更新

增量更新的做法很多,筆者以自己專案常用的方式 ( 以下列點 ) 做例子說明自 BigQuery 轉換到 Trino 的差異:

  1. 用特定鍵值 MERGE INTO 目的表
  2. 對目的表欲更新的欄位 UPDATE SET VALUE

Merge then Update

簡要說明筆者在實作上使用的增量更新流程,這流程可讀性高,且搭配分層ELT 架構設計也讓除錯難度降低了不少:

  1. 抽取資料來源
  • bronze 層的 log 表抓出某段時間範圍內的資料。
  • 使用 ROW_NUMBER() 再依據 鍵值 以及 更新時間 去除重複資料。
  1. 資料轉換清洗
  • 挑選需要的原生欄位。
  • 加上衍生計算欄位 (如 total_cost = cost_cents * quantity ) 。
  1. 比對目的 (Target) 與來源 (Source)
  • 根據 鍵值 以及 更新時間 去比對來源及目的表。
  • WHEN MATCHED:若目的表已有資料,且來源的時間比較新 → 做更新。
  • WHEN NOT MATCHED:若目的表找不到相同的紀錄 → 做插入。
{%include 'bq/xxx_silver/xxxx_order_xxx_history.sql' %}

MERGE
     `{{ get_dataset_name('xxx_silver') }}.xxxx_order_xxx_history` AS T
USING (
    WITH hourly_records AS (
        SELECT
            serial_id AS order_xxx_id, 
            order_id,
            COALESCE(return_order_id, "") AS return_order_id,
            ...
            total.cents AS total_cents,
            ...
            _processed_at,
        FROM
            `{{ get_dataset_name('xxx_bronze') }}.xxx_order_items_log`
        WHERE
            _processed_at >= TIMESTAMP("{{ xxx_start }}")
            AND _processed_at < TIMESTAMP("{{ xxx_end }}")
        QUALIFY
            ROW_NUMBER() OVER (PARTITION BY serial_id, TIMESTAMP_TRUNC(updated_at, HOUR) ORDER BY updated_at DESC, _processed_at DESC) = 1
    )
    SELECT
        *,
        cost_cents * quantity AS total_cost
    FROM
        hourly_records
) AS S
ON
    T.order_item_id = S.order_item_id
    AND TIMESTAMP_TRUNC(T.updated_at, HOUR) = TIMESTAMP_TRUNC(S.updated_at, HOUR)
WHEN MATCHED AND T.updated_at <= S.updated_at AND T._processed_at <= S._processed_at THEN
UPDATE SET
    T.order_id = S.order_id,
    T.return_order_id = S.return_order_id,
    ...
    T.updated_at = S.updated_at,
    T._updated_at = CURRENT_TIMESTAMP(),
    T._processed_at = S._processed_at
WHEN NOT MATCHED THEN
INSERT (
    order_xxx_id,
    order_id,
    ...
    _updated_at,
    _processed_at
)
VALUES (
    S.order_xxx_id,
    S.order_id,
    ...
    CURRENT_TIMESTAMP(),
    S._processed_at
);

語法差異

而這種增量更新的方式其實 Trino 的語法也有支援,只是要注意語法上的小差異:

-- MERGE
-- BigQuery : MERGE 或是MERGE INTO 都可以。
-- Trino:只能使用 MERGE INTO 表達式。

-- UPDATE
-- BigQuery : 目的表要帶 T.
UPDATE SET
    T.order_id = S.order_id,

-- Trino: 目的表不需要帶 T.
UPDATE SET
    order_id = S.order_id,

-- Include
-- BigQuery : 支援 include 語法
{%include 'bq/xxx_silver/xxx_order_transactions.sql' %}

-- Trino : 不支援 include 語法,一個檔案就是一支 SQL 程式

-- Macro
-- BigQuery : 支援 Macro function
{{ macros.parse_cdc_xxx_table('xxx_order_transactions')|indent(8) }}

-- Trino : 不支援Macro function

另外,比較想提的是 Subquery 語法上的差異,BigQuery 畢竟是商用軟體,對較複雜的邏輯處理支援度還是比較高。

如下面的例子,這種在 WHERE 條件裡同時使用到 base table 以及 subquery table 欄位的表達稱為 correlated subquery,共相關的子查詢:

-- Correlated subquery in BQ
...
(
    CASE WHEN (JSON_VALUE(item_xxxx, "$.variation_xxxx.key") IS NOT NULL) AND NOT SAFE_CAST(JSON_VALUE(xxxx_data, "$.same_price") AS BOOLEAN)
        THEN COALESCE(
            (
                SELECT
                    val
                FROM (
                    SELECT
                        JSON_VALUE(variation, '$.key') AS sub_id,
                        COALESCE(SAFE_CAST(JSON_VALUE(variation, '$.cost.cents') AS FLOAT64), 0) AS val
                    FROM
                        UNNEST(JSON_QUERY_ARRAY(object_xxxx, "$.variations")) AS variation
                )
                -- correlated subquery in where clause
                -- sub_id: column of subquery
                -- item_xxxx: column of base query
                WHERE
                    sub_id = JSON_VALUE(item_xxxx, "$.variation_xxxx.key")
                LIMIT 1
            ),
        0)
    ELSE
        COALESCE(SAFE_CAST(JSON_VALUE(object_xxxx, "$.cost.cents") AS FLOAT64), 0)
    END
) AS cost_cents,
...

correlated subquery 在 Trino 上就不支援了,只能將 CTE 寫好寫滿,也就是將下面的 join key ( ve.sub_id = vk.variation_xxxx_key ) 的 CTE ( 分別為 ve 及 vk ) 事先定義好才能拿來使用:

-- Subquery in Trino
...
(
    CASE
        WHEN JSON_VALUE(item_xxxx, 'strict $.variation_xxxx.key') IS NOT NULL
            AND NOT TRY_CAST(JSON_VALUE(object_xxxx, 'strict $.same_price') AS BOOLEAN)
        THEN COALESCE(
            (
                SELECT val
                FROM variations_extracted ve
                INNER JOIN variation_keys vk
                ON ve.sub_id = vk.variation_xxxx_key
                LIMIT 1
            ),
            0
        )
        ELSE COALESCE(TRY_CAST(JSON_VALUE(object_xxxx, 'strict $.cost.cents') AS REAL), 0)
    END
) AS cost_cents,
...

最後,對於單值的更新 BigQuery 與 Trino 也有語法上的差異,BigQuery 在值的更新上是可以多值一併更新, Trino 則僅支援單值更新:

-- BigQuery Update
UPDATE `{{ get_dataset_name('xxx') }}.xxxx_transactions` AS T
SET
    T.order_seller_id = S.order_seller_id,
    T.order_customer_id = COALESCE(S.order_customer_id, ''),
    T.order_type = S.order_type,
    ...
    T.order_order_source_id = COALESCE(S.order_order_source_id, ''),
    T._updated_at = CURRENT_TIMESTAMP()
FROM (
    SELECT
        order_id,
        agent_id AS order_agent_id,
        ...
        total.cents AS order_total_cents,
        type AS order_type
    FROM
        {{ get_dataset_name('xxx') }}.xxxx_orders AS orders
    WHERE
        -- Considering the recalculation, we increase the scanning range.
        _processed_at >= DATE_SUB(TIMESTAMP("{{ xxx_start }}"), INTERVAL 24 HOUR)
) AS S
WHERE
    T.order_id = S.order_id
    AND (
        T.order_seller_id IS NULL
        OR T.order_customer_id IS NULL
        ...
        OR T.order_status IS DISTINCT FROM S.order_status
        OR T.order_agent_id IS DISTINCT FROM S.order_agent_id
    )
;

-- Trino Update
UPDATE
  new_hires
SET
  manager = (
    SELECT
      e.name
    FROM
      employees e
    WHERE
      e.employee_id = new_hires.manager_id
  );

聚合計算

再來就是商業指標最常使用到的聚合計算 — GROUP BY,不管是要計算每月的營業額,還是要統計每月消費顧客數,都會用到他。

而無論是在 BigQuery 或 Trino 在用法跟定義上與標準 SQL 相差不遠,所以這邊不做贅述,直接闡述兩者語法上差異。

首先是 BigQuery,其衍生欄位可直接於 GROUP BY 鍵值中,以 alias name 的形式進行呼叫,如下面的 partitioned_at

-- BigQuery Group by
SELECT
    order_id,
    merchant_id,
    created_by,
    created_by_channel_id,
    order_source_id,
    total_currency_iso,
    removed,
    TIMESTAMP_TRUNC(created_at, HOUR) AS partitioned_at,
    ...
    SUM(total_cents) AS total_returned_amount,
    COUNT(DISTINCT return_order_id) AS total_returned_orders,
    -SUM(total_cost) AS total_cost
FROM
    `{{ get_dataset_name('xxx') }}.return_orders_other`
WHERE
    first_record
GROUP BY
    order_id,
    merchant_id,
    created_by,
    created_by_channel_id,
    order_source_id,
    total_currency_iso,
    removed,
    -- alias okay
    partitioned_at

再來看 Trino,這邊就沒有這麼好可以偷懶了,衍生欄位就必須在引用的表中出現過,或是乖乖的在 GROUP BY 鍵值引用時把轉換函數也寫上:

-- Trino Group by
SELECT
    order_id,
    merchant_id,
    created_by,
    created_by_channel_id,
    order_source_id,
    total_currency_iso,
    removed,
    DATE_TRUNC('hour', created_at) AS partitioned_at,
    ...
    SUM(total_cents) AS total_returned_amount,
    COUNT(DISTINCT return_order_id) AS total_returned_orders,
    -SUM(total_cost) AS total_cost
FROM
    iceberg.{{ get_schema_name("xxx") }}.{{ get_table_name("return_xxx_other") }}
WHERE
    first_record
    AND partitioned_at IS NOT NULL
GROUP BY
    order_id,
    merchant_id,
    created_by,
    created_by_channel_id,
    order_source_id,
    total_currency_iso,
    removed,
    -- alias not allowed if using functions
    DATE_TRUNC('hour', created_at)

明日預告

明日《Trino + Iceberg ELT 實作 (四)》將繼續分享:從 BigQuery 遷移到 Trino 的效能優化技巧,以及資料驗證的實戰方法。

Know me more

My Linkedin: https://www.linkedin.com/in/benny0624/
My Medium: https://hndsmhsu.medium.com/


上一篇
Trino + Iceberg ELT實作(二)
系列文
動不動就要 ETL? 以Trino為例-淺談從資料倉儲到湖倉27
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言