iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Build on AWS

動漫宅的 30 天 AWS Lakehouse 修行日誌系列 第 18

Day18 淬鍊之章-使用 Lambda 呼叫 Glue Workflow

  • 分享至 

  • xImage
  •  

簡介

Day17 淬鍊之章-Glue 實作篇-2 中,我們已經介紹了 Glue Workflow,它能將多個 Glue Job 串接成一個完整的資料處理流程。

今天我們要進一步把 S3 事件觸發 EventBridge 與 Lambda 結合,當新檔案上傳到指定的 Bucket 時,就能自動觸發 Glue Workflow 做 ETL。

這樣的設計,能將手動操作完全自動化,實現事件驅動的數據管線 (Event-driven Data Pipeline)。

整體流程架構

當我們把所有組件串起來後,完整的流程會長這樣:

  1. 使用者 / 系統上傳檔案到 S3

    • 檔案通常會放在 Bronze 層,例如:
      s3://anime-lake/bronze/animes/
      s3://anime-lake/bronze/ratings/
  2. S3 事件通知 (EventBridge)

    • S3 偵測到新檔案後,會觸發一個事件,並傳送到 EventBridge
  3. Lambda Function

    • EventBridge 將事件交給 Lambda
    • Lambda 會解析檔名、資料夾位置,將 csv 檔案名稱後面的日期做解析後,使用時間產生新的路徑,並放入檔案。
    • 接著呼叫 Glue Workflow API
  4. Glue Workflow 執行

    • Workflow 負責依序啟動 ETL Job,包含:
      • 清理與轉換原始檔案,並轉成 Parquet/Iceberg 格式,寫入 Silver 儲存層。
      • 接著再將 Silver 層的資料表做 Join 後,寫入至 Gold 儲存層,做後續的資料分析。

了解大概的流程後,我們就來開始改寫 Lambda 讓它可以呼叫昨天建置好的 Glue Workflow。

新增 Glue Policy 至 Full_Lambda_Role

要讓 Lambda 可以使用 Glue,我們需要幫 Lambda 的 Role 也就是 Full_Lambda_Role 新增 Glue 的 Policy。

Step1:首先登入使用者 Joe,並至 IAM 找到 Full_Lambda_Role,點選「Add permissions」後選擇「Attach Policy」。
https://ithelp.ithome.com.tw/upload/images/20251002/20163443rXNS7HAcab.png

Step2:接著搜尋 Glue 的 Policy,選擇 AWSGlueConsoleFullAccess 後點選右下角 「Add permissions」。

https://ithelp.ithome.com.tw/upload/images/20251002/20163443r0cIDvmCwy.png

Step3:當頁面跳轉後,確認有正常新增了 AWSGlueConsoleFullAccess Policy,即完成新增 Policy 給 Full_Lambda_Role,這樣 Lambda 就可以透過這個 Role 來呼叫 Glue 了。
https://ithelp.ithome.com.tw/upload/images/20251002/20163443H2FViLTUJS.png

在 Lambda 中新增呼叫 Glue Workflow 的 Python Shell

添加完 Role 的 Policy 後,我們首先先切回使用者 Andy,然後我們要來調整 Lambda 的 Python Shell。

Step1:找到 Lambda 函式 anime-lake-bronze-partition,並點擊進入該函式頁面。

https://ithelp.ithome.com.tw/upload/images/20251002/20163443UsDnXkT0vg.png

Step2:接著來改寫該 Lambda 新增呼叫 Glue Workflow wf_animes_summary 的程式,接著做 Deploy。
https://ithelp.ithome.com.tw/upload/images/20251002/20163443BLn0usSw8L.png

  • 程式碼:
import boto3
import os
import re

s3 = boto3.client("s3")
glue = boto3.client("glue")

def lambda_handler(event, context):
    record = event["Records"][0]
    bucket = record["s3"]["bucket"]["name"]
    key = record["s3"]["object"]["key"]

    # 檔名 (不含路徑)
    filename = os.path.basename(key)

    # 正則解析 dataset 與日期 (YYYYMMDD)
    match = re.match(r"^(animes|ratings)_(\d{8})\.csv$", filename)
    if not match:
        return {
            "statusCode": 400,
            "body": f"Unsupported file format: {filename}"
        }

    dataset, file_date_raw = match.groups()
    file_date = f"{file_date_raw[0:4]}-{file_date_raw[4:6]}-{file_date_raw[6:8]}"

    # 生成新 S3 路徑
    new_key = f"Bronze/{dataset}/{file_date}/{dataset}.csv"

    # 搬移檔案
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": key},
        Key=new_key
    )
    s3.delete_object(Bucket=bucket, Key=key)

    # 呼叫 Glue Workflow
    response = glue.start_workflow_run(
        Name="wf_animes_summary"
    )

    return {
        "statusCode": 200,
        "body": f"File {filename} moved to {new_key}, workflow started with RunId={response['RunId']}"
    }

Step3:Deploy 後,我們來重新上傳 S3 File 到 Bronze 資料夾,看看會不會自動觸發 Event。

  • 上傳 animes_20251002.csvanimes/
  • 上傳 ratings_20251002.csvratings/

https://ithelp.ithome.com.tw/upload/images/20251002/20163443swWI6DOJHu.png

Step4:接著我們可以至 Glue 的 「Job run monitoring」頁面,來確認是否有正確的觸發 Glue Workflow。

  • 我們可以看到 sliver_animessilver_ratingsgold_anime_summary 等 Job 都有被正常觸發。
  • 到此我們即完成從 S3 檔案上傳到觸發 Lambda ,透過 Lambda 呼叫 Glue Workflow 的整個 Pipeline。

https://ithelp.ithome.com.tw/upload/images/20251002/20163443BrWIRYIA6S.png

結論與建議

在本篇中,我們完成了 S3 → Lambda → Glue Workflow 的串接流程,讓資料從檔案上傳後即可自動進入 ETL Pipeline。整個過程中,我們將檔案依日期進行 Bronze 分層,並由 Glue Workflow 負責清理、轉換與最終寫入 Silver/Gold 層,達成 事件驅動的資料管線

然而,在實測中我們也觀察到一個問題:

  • 重複觸發 Workflow:由於 S3 EventBridge 會對每個檔案上傳各別觸發一次 Lambda,因此只要同時上傳多個檔案,Workflow 便可能被重複啟動,導致 Silver/Gold 的 Job 重複執行。
  • 這屬於一個應用場景的設計,我們將在後面的文章中來限制使用場景,並根據場景來設計適合的 Pipeline。

下篇預告

下篇我們將透過「 Day19 淬鍊之章-使用 Athena 驗證資料 」來介紹 Athena 是什麼,並實際應用此服務結合 Glue Catalog 來對 Data Lake 內的資料做 SQL Query 的操作。


參考資料

[1] AWS Glue Workflow 官方文件
[2] Using AWS Lambda with Amazon S3
[3] Amazon EventBridge 入門
[4] AWS Glue Job Triggers


上一篇
Day17 淬鍊之章-Glue 實作篇-2
下一篇
Day19 淬鍊之章-使用 Athena 驗證資料
系列文
動漫宅的 30 天 AWS Lakehouse 修行日誌19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言