iT邦幫忙

2025 iThome 鐵人賽

DAY 23
0
Build on AWS

動漫宅的 30 天 AWS Lakehouse 修行日誌系列 第 23

Day23 淬鍊之章-資料補跑機制 Backfill 實作篇

  • 分享至 

  • xImage
  •  

簡介

在上篇 Day22 淬鍊之章-多檔案上傳 ETL 流程-實作篇2 中,我們完成了完整的檔案檢查與 ETL 排程機制。

不過在實務中,難免會出現這些情況:

  • 某天的檔案上傳延遲
  • Glue Workflow 異常中斷
  • 想針對歷史資料重新回補

這時,我們就需要一個關鍵能力 —— Backfill(資料補跑機制)

今天我們將擴充原本的 Lambda,讓它除了每天自動偵測「今日檔案」外,也能手動指定任意日期重新檢查與觸發 Glue Workflow,進一步提升整個 Data Lakehouse Pipeline 的 可重入性(Idempotent)與可恢復性(Recoverable)


Lambda Function 預計調整內容

再調整一次 Lambda Function 使其可以做到以下內容:

  • 預設檢查「今日」的資料檔案
  • 可手動輸入日期進行 Backfill

架構設計概念

https://ithelp.ithome.com.tw/upload/images/20251007/201634438WEwfvItpN.png

說明:

  1. 每日自動觸發:預設由 EventBridge 呼叫 Lambda,檢查「今日」檔案。
  2. 手動補跑:可手動呼叫 Lambda 並傳入 date 參數(如 2025-10-06)。
  3. Lambda 檢查檔案完整性:若兩個檔案皆存在,觸發 Glue Workflow。
  4. Glue Workflow 接收參數:將日期以 --process_date 傳入 Glue Job 處理。
  5. SNS 通知:結果(成功、缺檔、錯誤)都會即時推播。

調整 Lambda Function

https://ithelp.ithome.com.tw/upload/images/20251007/20163443LOS7lncnVx.png

修改程式碼:

注意要改成自己的 account_id

import boto3
import datetime
import json

s3 = boto3.client("s3")
glue = boto3.client("glue")
sns = boto3.client("sns")

BUCKET_NAME = "anime-lake"
WORKFLOW_NAME = "wf_animes_summary"
SNS_TOPIC_ARN = "arn:aws:sns:ap-east-2:064083567781:anime-etl-alert"

def lambda_handler(event, context):
    # 🗓️ Step 1: 取得日期(預設今天)
    process_date = event.get("date") if event and "date" in event else datetime.date.today().strftime("%Y-%m-%d")
    print(f"🔍 Checking files for date: {process_date}")

    # 預期檔案位置
    expected_files = [
        f"Bronze/animes/{process_date}/animes.csv",
        f"Bronze/ratings/{process_date}/ratings.csv"
    ]

    missing_files = []

    # 🔎 Step 2: 檢查 S3 檔案是否存在
    for key in expected_files:
        try:
            s3.head_object(Bucket=BUCKET_NAME, Key=key)
            print(f"✅ Found: {key}")
        except s3.exceptions.ClientError:
            print(f"❌ Missing: {key}")
            missing_files.append(key)

    # ✅ Step 3: 若所有檔案皆存在 → 觸發 Glue Workflow
    if not missing_files:
        try:
            response = glue.start_workflow_run(
                Name=WORKFLOW_NAME,
                RunProperties={
                    "process_date": process_date
                }
            )
            run_id = response["RunId"]
            print(f"🚀 Started Glue Workflow: {run_id}")

            # SNS 成功通知
            message = {
                "status": "success",
                "date": process_date,
                "workflow_name": WORKFLOW_NAME,
                "workflow_run_id": run_id,
                "message": "All files found, Glue Workflow triggered successfully."
            }

            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject=f"✅ Glue Workflow Triggered for {process_date}",
                Message=json.dumps(message, indent=2)
            )
            print("📨 SNS success notification sent.")

            return {"statusCode": 200, "body": f"Workflow started for {process_date}"}

        except Exception as e:
            # SNS 失敗通知
            error_message = {
                "status": "failed",
                "date": process_date,
                "workflow_name": WORKFLOW_NAME,
                "error": str(e)
            }

            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject=f"❌ Glue Workflow Failed for {process_date}",
                Message=json.dumps(error_message, indent=2)
            )
            print(f"🛑 Error starting Glue Workflow: {e}")

            return {"statusCode": 500, "body": f"Error: {str(e)}"}

    # ⚠️ Step 4: 若有缺失檔案 → 跳過並通知 SNS
    else:
        warning_message = {
            "status": "skipped",
            "date": process_date,
            "missing_files": missing_files,
            "message": "Some files are missing. Workflow not triggered."
        }

        sns.publish(
            TopicArn=SNS_TOPIC_ARN,
            Subject=f"⚠️ Missing Files Detected for {process_date}",
            Message=json.dumps(warning_message, indent=2)
        )
        print("⚠️ SNS warning notification sent (missing files).")

        return {"statusCode": 200, "body": f"Missing files: {', '.join(missing_files)}"}

呼叫方式

1.每日自動排程(預設今天)

由 EventBridge Scheduler 自動觸發:

{}

Lambda 會自動檢查「今日」日期的檔案。

執行測試

https://ithelp.ithome.com.tw/upload/images/20251007/20163443QRQRxIXcxO.png

當 json = {} 時,會自動帶入當日日期

https://ithelp.ithome.com.tw/upload/images/20251007/20163443m4PJqf5Rzw.png

接著確認 Glue Job 有正常被觸發

https://ithelp.ithome.com.tw/upload/images/20251007/20163443PmzNxA6Vm8.png

2.手動 Backfill 指定日期

手動呼叫 Lambda(例如在 Console 或 CLI 中):

{
  "date": "2025-10-06"
}

https://ithelp.ithome.com.tw/upload/images/20251007/20163443qAfTJW6uD1.png

這樣會檢查:

s3://anime-lake/Bronze/animes/2025-10-06/animes.csv
s3://anime-lake/Bronze/ratings/2025-10-06/ratings.csv

若都存在,就會自動觸發 Glue Workflow 並帶入參數:

--process_date=2025-10-06

https://ithelp.ithome.com.tw/upload/images/20251007/20163443yhO6cLu2Xc.png

接著確認 Glue Job 有正常被觸發

https://ithelp.ithome.com.tw/upload/images/20251007/20163443Q6tVBSqnTX.png


Backfill 的關鍵設計理念

功能模組 說明 實作方式
日期輸入 可自動/手動指定處理日期 Lambda event 參數
資料驗證 確保當天所有檔案都存在 s3.head_object
參數化設計 Glue Job 接收 --process_date 支援分區處理與補跑

調整後效果

  1. 日常自動排程:無須人工干預,每日自動檢查與執行。
  2. 手動補跑 Backfill:可快速針對遺漏批次重新執行。
  3. Glue Workflow 可重入:同日期資料不重複、可覆蓋。

結論與建議

透過這次的 Backfill 實作,我們讓 ETL Pipeline:

  • 具備日期參數化能力,可針對任意日期補跑。
  • 提升可追蹤性與可恢復性,讓整個系統更接近企業級標準。

使用 Lambda + Glue Workflow + SNS 打造出一個「自動化 + 可重入 + 可追蹤」的雲端 ETL Pipeline。


下篇預告

在完成 Backfill 機制後,我們的資料管線已經具備了自動化與穩定性的基礎,也完成了淬鍊之章的所有內容。

但若要讓整個 Lakehouse 真正能在多人協作、跨部門使用的情境下安全運行,我們還需要引入一個關鍵角色 —— 資料權限治理(Data Governance)

在下篇 「Day24 視覺之章-Lake Formation 概念篇」 中,我們將正式揭開 AWS 的資料治理核心服務:AWS Lake Formation


參考資料

[1] AWS Lambda Developer Guide
[2] AWS Glue Workflow Documentation
[3] Amazon SNS Developer Guide
[4] AWS EventBridge Scheduler
[5] Amazon S3 head_object API Reference


上一篇
Day22 淬鍊之章-多檔案上傳 ETL 流程-實作篇2
系列文
動漫宅的 30 天 AWS Lakehouse 修行日誌23
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言