iT邦幫忙

2025 iThome 鐵人賽

DAY 17
0

簡介

在上篇 Day16 淬鍊之章-Glue 實作篇-1 中,我們實作了 Glue 所需的 IAM Role 建立和指派,以及實際建立一支 Glue PySpark Job。

本篇我們要來建立另一支 Glue PySpark Job 來處理 Silver animes 與 ratings 的 Table Join,並使用 Glue Workflow 將 silver Job 與 gold job 串接成一個 Pipeline。

Iceberg 在 S3 的儲存結構

首先,我們先來確認當我們在 Glue + Iceberg 中建立表格時,S3 會產生的兩個主要目錄,例如:

s3://anime-lake/gold.db/animes_summary/
├── data/ # 實際資料檔案 (Parquet, ORC, Avro)
└── metadata/ # Iceberg 表的元資料 (JSON, Avro)

📂 data/ 目錄

  • 用途:存放實際的資料檔案(通常是 Parquet)。
  • 檔案結構
    • 每個 partition 對應到一個子目錄,例如:
      data/year=2021/genres=Action/part-0000-xxxx.parquet
      data/year=2021/genres=Comedy/part-0001-yyyy.parquet
      data/year=0/genres=Unknown/part-0002-zzzz.parquet
      
    • 檔名裡帶有唯一 ID,保證不可變(immutable file)。
  • 重點:Iceberg 採用 append-only 模型,不會覆蓋檔案,而是寫新檔再更新 metadata。

📂 metadata/ 目錄

  • 用途:追蹤表的狀態、欄位 schema、partition 設定、歷史版本。
  • 檔案類型
    • v0001.metadata.json
      定義 schema、partition 規則,以及目前有哪些 snapshot。
    • snapshots.avro
      每次寫入會建立一個 snapshot,指向當前活躍的 data files。
    • manifest.avro / manifest-list.avro
      記錄有哪些 parquet 檔屬於這個 snapshot。
  • 重點
    • metadata 檔非常小,但極度重要。
    • Athena、Spark 查詢 Iceberg 表時,會先讀取 metadata/,再決定要掃哪些 data/ 檔案。
    • 這就是 Iceberg 能夠支援 Schema Evolution(欄位新增/刪除)和 Time Travel(查歷史版本)的基礎。

小結

  • data/ = 真正的資料檔案(依 partition 存放)。
  • metadata/ = Iceberg 的腦袋,記錄「哪些檔案屬於這張表、schema 是什麼、目前最新版本是哪個 snapshot」。

這也是為什麼有時候 Glue Job 寫入失敗會看到 metadata 殘留 → commit abort,因為 Iceberg 在設計上強調 一致性 (Consistency),metadata 一旦不完整,就會中止寫入。

建立 Gold Job

了解完 Iceberg 的分區儲存設計後,我們來實際建立 Glue Job

Step1:

  • 首先我們一樣來建立一支 Glue Job 並命名為 gold_anime_summary
  • 並將 code 填入下方的 script 區塊內
  • 本次我們在 gold 的 job 設計上將 silver.animes 和 silver.rating 做 Join,並採用 year + genres 作為 GroupBy 條件和 Partition 設定
  • 接著取得 anime_countavg_anime_scoretotal_ratingsavg_user_rating 等指標

https://ithelp.ithome.com.tw/upload/images/20251001/201634436nexR2DoZP.png

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from awsglue.job import Job

# 初始化 Glue Job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Iceberg Catalog 設定
spark.conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.warehouse", "s3://anime-lake/")
spark.conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
spark.conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

# 建立 Gold Database
spark.sql("CREATE DATABASE IF NOT EXISTS gold LOCATION 's3://anime-lake/gold.db/'")

# 讀取 Silver 層資料
silver_animes = spark.table("glue_catalog.silver.animes")
silver_ratings = spark.table("glue_catalog.silver.ratings")

# ====== year 欄位清理:確保 partition 安全 ======
# year:非 4 位數 → 0
silver_animes = silver_animes.withColumn(
    "year",
    F.when(F.col("year").rlike("^[0-9]{4}$"), F.col("year").cast("int"))
     .otherwise(F.lit(0))
)

# genres:NULL 或空字串 → 'Unknown'
silver_animes = silver_animes.withColumn(
    "genres",
    F.when(F.col("genres").isNull() | (F.trim(F.col("genres")) == ""), F.lit("Unknown"))
     .otherwise(F.trim(F.col("genres")))
)

# ====== Join + 聚合 ======
gold_summary = (
    silver_animes.join(
        silver_ratings,
        silver_animes["animeID"] == silver_ratings["anime_id"],
        "left"
    )
    .groupBy("year", "genres")
    .agg(
        F.countDistinct("animeID").alias("anime_count"),
        F.avg("score").alias("avg_anime_score"),
        F.count("rating").alias("total_ratings"),
        F.avg("rating").alias("avg_user_rating")
    )
)

# ====== 寫入 Gold (Iceberg Table) ======
gold_summary.write \
    .format("iceberg") \
    .partitionBy("year", "genres") \
    .mode("overwrite") \
    .saveAsTable("glue_catalog.gold.animes_summary")

job.commit()

Step2:設定 Job detials

  • 設定與昨天的 Job 內容完全一致

https://ithelp.ithome.com.tw/upload/images/20251001/20163443t9kuZJKjk9.png

https://ithelp.ithome.com.tw/upload/images/20251001/20163443Mtw02q8iGd.png

https://ithelp.ithome.com.tw/upload/images/20251001/20163443J6dMi11Z0d.png

Step3:接著於右上角執行 gold_anime_summary Job,然後確定是否執行成功
https://ithelp.ithome.com.tw/upload/images/20251001/20163443coJA5WULNx.png

Step4:當 Job 成功後,我們去 Glue Database 與 Table 確認是否有正常建立
https://ithelp.ithome.com.tw/upload/images/20251001/20163443g4bhurqaDe.png

https://ithelp.ithome.com.tw/upload/images/20251001/20163443asp3feMXVO.png

Step5:接著確認一下建立好的 Table Schema 是否為我們所需
https://ithelp.ithome.com.tw/upload/images/20251001/20163443thFmQHn5BO.png

Step6:確認無誤後,我們來確認一下 S3 Data Lake 的儲存結構,首先我們先來確認一下 iceberg 的設計

  • 可以看到 anime-lake 內多了一個 gold.db 的資料夾
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443e0Vck9p8nx.png

  • 進入 gold.db 可以看到剛剛使用 Glue 建立的 animes_summary 資料夾
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443YoSG9U4L7Q.png

  • 接著進入 animes_summary 我們可以看到:

  • /data 與 metadata/

https://ithelp.ithome.com.tw/upload/images/20251001/20163443ICkMZ2HSYI.png

  • /data 內的子資料夾第一層是根據動漫的發行年份 year 做分區
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443TMi7IzXv6N.png

  • 進入 year 子資料夾後,可以看到第二層是使用 genres 來做分區
    https://ithelp.ithome.com.tw/upload/images/20251001/201634439Xlo5Thvkz.png

  • 接著不同 genres 會分別有一個 parquet 彙總檔案
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443SU6jqkT6gf.png

  • 再來是 metadata/,有正常建立 metadata.json 與相關的 .avro 檔案
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443WMNQBNQRli.png

以上即完成 Gold Glue Job 搭配 iceberg 的設計

Glue Workflow 建置

接下來我們要把 silver job 和 gold job 使用 Glue Workflow 做串接建置成一個 pipeline

Step1:首先我們找到 Glue Workflow,並點選右上角的「Add Workflow」按鈕
https://ithelp.ithome.com.tw/upload/images/20251001/20163443FuidFc0Bld.png

Step2:建置 Workflow 並命名為 wf_animes_summary

  • 並填寫描述說明(可選):串接 silver glue + gold glue,建置 anime pipeline
  • 接著按右下角的「Create Workflow」
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443wJpYpUOJpG.png

Step3:待跳轉後,確認有實際成功建立 wf_animes_summary
https://ithelp.ithome.com.tw/upload/images/20251001/20163443ivmLYrnWhh.png

Step4:進入 wf_animes_summary 後,點選下方的 Trigger 按鈕
https://ithelp.ithome.com.tw/upload/images/20251001/20163443oqI6rYjwVP.png

Step5:接著我們來實際建立 Trigger 條件

  • 點選 「Add New」因為我們沒有建好的 Trigger 故需要做新增
  • Trigger Type 我們先選擇 「On Demand」,有手動做觸發才會執行
  • 接著按「新增」即可完成建立
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443JMBKLPpfwu.png

Step6:建立完 Trigger 後,我們要來設定 Pipeline,點選下方的「Add node」
https://ithelp.ithome.com.tw/upload/images/20251001/20163443QaYiABh1kv.png

Step7:點進去後會跳出可以選擇前面建立好的 Glue Job 作為 node
https://ithelp.ithome.com.tw/upload/images/20251001/20163443lBfogHMAh3.png

Step8:我們首先先選擇兩個 silver Job 作為前面的 node
https://ithelp.ithome.com.tw/upload/images/20251001/20163443P68g6jrzU4.png

Step9:接著當兩個 silver 都完成後,我要們接續執行 gold job

  • 首先要建立一個 Event Trigger 條件,來偵測兩個 silver job 都有完成
  • 點選其中一個 silver job 後會跳出 Add Trigger 的圖形
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443NR0NUafBhg.png

Step10:點選 Add Trigger 圖形,進入 Trigger 的設定

  • 建立一個新的 Trigger tr_job_event_all
  • 並選擇 Event Trigger
  • 接著選擇 Start After ALL watch event
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443D6adTmI5s4.png

Step11:成功建立新的 trigger 後,我們接續建立 gold job 的 node
https://ithelp.ithome.com.tw/upload/images/20251001/20163443oVIYgJFKd6.png

  • 選擇 Gold Job
    https://ithelp.ithome.com.tw/upload/images/20251001/201634430neVDFm0Er.png

Step12:調整 Trigger Event

  • 點選 tr_job_event_all
  • 點選 Add Jobs to Trigger
    https://ithelp.ithome.com.tw/upload/images/20251001/20163443yuy4jlT8C7.png

Step13:將 silver animes job 也新增進觸發的 Event 條件
https://ithelp.ithome.com.tw/upload/images/20251001/20163443HqbBnkRW1Z.png

Step14:回到 UI 確認兩個 Silver Job 的線都有關連到 tr_job_event_all 上,即完成 Pipeline 的建置
https://ithelp.ithome.com.tw/upload/images/20251001/20163443922xrs89LH.png

Step15:接著我們來實際執行這個 Workflow,點選頁面右上角的 「Run Workflow」按鈕
https://ithelp.ithome.com.tw/upload/images/20251001/20163443ezhIhP9uk0.png

Step16:接著我們可以到「 Job run monitoring 」的頁面,來查看各個 Job 的運行狀況

  • 可以看到有按照順序觸發我們設定的 Glue Job
    https://ithelp.ithome.com.tw/upload/images/20251002/201634434OWvNXRLQo.png

Step17:最後我們回到 Glue Workflow 頁面,來查看排程狀態是否正常完成
https://ithelp.ithome.com.tw/upload/images/20251002/20163443JiIJRViHem.png

透過以上流程,我們就完成了一個簡易的 Glue Workflow 建置!

總結與建議

今天我們完成了:

  • gold_anime_summary 的 Glue Job 建立
  • 了解 Glue 執行後會於 S3 建立什麼儲存結構
  • 如何建立 Glue Workflow 來建立 pipeline 串接 Silver 和 Gold 的 Glue Job

透過以上操作,我們可以手動觸發 Glue Workflow 按照設定好的排程設計,來執行多支 Glue Job

下篇預告

下篇我們將透過「 Day18 淬鍊之章-使用 Lambda 呼叫 Glue Workflow 」來接前面介紹的 S3 Event 觸發和本篇介紹的 Glue Workflow 做串接,變成一個從檔案上傳到 Glue ETL 資料清理並存入 Data Lake 的自動化 Pipeline。

參考資料


上一篇
Day16 淬鍊之章-Glue 實作篇-1
下一篇
Day18 淬鍊之章-使用 Lambda 呼叫 Glue Workflow
系列文
動漫宅的 30 天 AWS Lakehouse 修行日誌19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言