增量更新的做法很多,筆者以自己專案常用的方式 ( 以下列點 ) 做例子說明自 BigQuery 轉換到 Trino 的差異:
MERGE INTO
目的表UPDATE SET VALUE
簡要說明筆者在實作上使用的增量更新流程,這流程可讀性高,且搭配分層ELT 架構設計也讓除錯難度降低了不少:
bronze
層的 log 表抓出某段時間範圍內的資料。ROW_NUMBER()
再依據 鍵值
以及 更新時間
去除重複資料。total_cost = cost_cents * quantity
) 。鍵值
以及 更新時間
去比對來源及目的表。WHEN MATCHED
:若目的表已有資料,且來源的時間比較新 → 做更新。WHEN NOT MATCHED
:若目的表找不到相同的紀錄 → 做插入。{%include 'bq/xxx_silver/xxxx_order_xxx_history.sql' %}
MERGE
`{{ get_dataset_name('xxx_silver') }}.xxxx_order_xxx_history` AS T
USING (
WITH hourly_records AS (
SELECT
serial_id AS order_xxx_id,
order_id,
COALESCE(return_order_id, "") AS return_order_id,
...
total.cents AS total_cents,
...
_processed_at,
FROM
`{{ get_dataset_name('xxx_bronze') }}.xxx_order_items_log`
WHERE
_processed_at >= TIMESTAMP("{{ xxx_start }}")
AND _processed_at < TIMESTAMP("{{ xxx_end }}")
QUALIFY
ROW_NUMBER() OVER (PARTITION BY serial_id, TIMESTAMP_TRUNC(updated_at, HOUR) ORDER BY updated_at DESC, _processed_at DESC) = 1
)
SELECT
*,
cost_cents * quantity AS total_cost
FROM
hourly_records
) AS S
ON
T.order_item_id = S.order_item_id
AND TIMESTAMP_TRUNC(T.updated_at, HOUR) = TIMESTAMP_TRUNC(S.updated_at, HOUR)
WHEN MATCHED AND T.updated_at <= S.updated_at AND T._processed_at <= S._processed_at THEN
UPDATE SET
T.order_id = S.order_id,
T.return_order_id = S.return_order_id,
...
T.updated_at = S.updated_at,
T._updated_at = CURRENT_TIMESTAMP(),
T._processed_at = S._processed_at
WHEN NOT MATCHED THEN
INSERT (
order_xxx_id,
order_id,
...
_updated_at,
_processed_at
)
VALUES (
S.order_xxx_id,
S.order_id,
...
CURRENT_TIMESTAMP(),
S._processed_at
);
而這種增量更新的方式其實 Trino 的語法也有支援,只是要注意語法上的小差異:
-- MERGE
-- BigQuery : MERGE 或是MERGE INTO 都可以。
-- Trino:只能使用 MERGE INTO 表達式。
-- UPDATE
-- BigQuery : 目的表要帶 T.
UPDATE SET
T.order_id = S.order_id,
-- Trino: 目的表不需要帶 T.
UPDATE SET
order_id = S.order_id,
-- Include
-- BigQuery : 支援 include 語法
{%include 'bq/xxx_silver/xxx_order_transactions.sql' %}
-- Trino : 不支援 include 語法,一個檔案就是一支 SQL 程式
-- Macro
-- BigQuery : 支援 Macro function
{{ macros.parse_cdc_xxx_table('xxx_order_transactions')|indent(8) }}
-- Trino : 不支援Macro function
另外,比較想提的是 Subquery 語法上的差異,BigQuery 畢竟是商用軟體,對較複雜的邏輯處理支援度還是比較高。
如下面的例子,這種在 WHERE
條件裡同時使用到 base table
以及 subquery table
欄位的表達稱為 correlated subquery
,共相關的子查詢:
-- Correlated subquery in BQ
...
(
CASE WHEN (JSON_VALUE(item_xxxx, "$.variation_xxxx.key") IS NOT NULL) AND NOT SAFE_CAST(JSON_VALUE(xxxx_data, "$.same_price") AS BOOLEAN)
THEN COALESCE(
(
SELECT
val
FROM (
SELECT
JSON_VALUE(variation, '$.key') AS sub_id,
COALESCE(SAFE_CAST(JSON_VALUE(variation, '$.cost.cents') AS FLOAT64), 0) AS val
FROM
UNNEST(JSON_QUERY_ARRAY(object_xxxx, "$.variations")) AS variation
)
-- correlated subquery in where clause
-- sub_id: column of subquery
-- item_xxxx: column of base query
WHERE
sub_id = JSON_VALUE(item_xxxx, "$.variation_xxxx.key")
LIMIT 1
),
0)
ELSE
COALESCE(SAFE_CAST(JSON_VALUE(object_xxxx, "$.cost.cents") AS FLOAT64), 0)
END
) AS cost_cents,
...
而 correlated subquery
在 Trino 上就不支援了,只能將 CTE 寫好寫滿,也就是將下面的 join key ( ve.sub_id = vk.variation_xxxx_key
) 的 CTE ( 分別為 ve 及 vk ) 事先定義好才能拿來使用:
-- Subquery in Trino
...
(
CASE
WHEN JSON_VALUE(item_xxxx, 'strict $.variation_xxxx.key') IS NOT NULL
AND NOT TRY_CAST(JSON_VALUE(object_xxxx, 'strict $.same_price') AS BOOLEAN)
THEN COALESCE(
(
SELECT val
FROM variations_extracted ve
INNER JOIN variation_keys vk
ON ve.sub_id = vk.variation_xxxx_key
LIMIT 1
),
0
)
ELSE COALESCE(TRY_CAST(JSON_VALUE(object_xxxx, 'strict $.cost.cents') AS REAL), 0)
END
) AS cost_cents,
...
最後,對於單值的更新 BigQuery 與 Trino 也有語法上的差異,BigQuery 在值的更新上是可以多值一併更新, Trino 則僅支援單值更新:
-- BigQuery Update
UPDATE `{{ get_dataset_name('xxx') }}.xxxx_transactions` AS T
SET
T.order_seller_id = S.order_seller_id,
T.order_customer_id = COALESCE(S.order_customer_id, ''),
T.order_type = S.order_type,
...
T.order_order_source_id = COALESCE(S.order_order_source_id, ''),
T._updated_at = CURRENT_TIMESTAMP()
FROM (
SELECT
order_id,
agent_id AS order_agent_id,
...
total.cents AS order_total_cents,
type AS order_type
FROM
{{ get_dataset_name('xxx') }}.xxxx_orders AS orders
WHERE
-- Considering the recalculation, we increase the scanning range.
_processed_at >= DATE_SUB(TIMESTAMP("{{ xxx_start }}"), INTERVAL 24 HOUR)
) AS S
WHERE
T.order_id = S.order_id
AND (
T.order_seller_id IS NULL
OR T.order_customer_id IS NULL
...
OR T.order_status IS DISTINCT FROM S.order_status
OR T.order_agent_id IS DISTINCT FROM S.order_agent_id
)
;
-- Trino Update
UPDATE
new_hires
SET
manager = (
SELECT
e.name
FROM
employees e
WHERE
e.employee_id = new_hires.manager_id
);
再來就是商業指標最常使用到的聚合計算 — GROUP BY
,不管是要計算每月的營業額,還是要統計每月消費顧客數,都會用到他。
而無論是在 BigQuery 或 Trino 在用法跟定義上與標準 SQL 相差不遠,所以這邊不做贅述,直接闡述兩者語法上差異。
首先是 BigQuery,其衍生欄位可直接於 GROUP BY
鍵值中,以 alias name
的形式進行呼叫,如下面的 partitioned_at
:
-- BigQuery Group by
SELECT
order_id,
merchant_id,
created_by,
created_by_channel_id,
order_source_id,
total_currency_iso,
removed,
TIMESTAMP_TRUNC(created_at, HOUR) AS partitioned_at,
...
SUM(total_cents) AS total_returned_amount,
COUNT(DISTINCT return_order_id) AS total_returned_orders,
-SUM(total_cost) AS total_cost
FROM
`{{ get_dataset_name('xxx') }}.return_orders_other`
WHERE
first_record
GROUP BY
order_id,
merchant_id,
created_by,
created_by_channel_id,
order_source_id,
total_currency_iso,
removed,
-- alias okay
partitioned_at
再來看 Trino,這邊就沒有這麼好可以偷懶了,衍生欄位就必須在引用的表中出現過,或是乖乖的在 GROUP BY
鍵值引用時把轉換函數也寫上:
-- Trino Group by
SELECT
order_id,
merchant_id,
created_by,
created_by_channel_id,
order_source_id,
total_currency_iso,
removed,
DATE_TRUNC('hour', created_at) AS partitioned_at,
...
SUM(total_cents) AS total_returned_amount,
COUNT(DISTINCT return_order_id) AS total_returned_orders,
-SUM(total_cost) AS total_cost
FROM
iceberg.{{ get_schema_name("xxx") }}.{{ get_table_name("return_xxx_other") }}
WHERE
first_record
AND partitioned_at IS NOT NULL
GROUP BY
order_id,
merchant_id,
created_by,
created_by_channel_id,
order_source_id,
total_currency_iso,
removed,
-- alias not allowed if using functions
DATE_TRUNC('hour', created_at)
明日《Trino + Iceberg ELT 實作 (四)》將繼續分享:從 BigQuery 遷移到 Trino 的效能優化技巧,以及資料驗證的實戰方法。
My Linkedin: https://www.linkedin.com/in/benny0624/
My Medium: https://hndsmhsu.medium.com/