iT邦幫忙

2024 iThome 鐵人賽

DAY 8
0
DevOps

我獨自升級:從水管工走向 DataOps系列 第 8

【Day 8】Airflow 卍解 -自訂 XCom 後端解除限制

  • 分享至 

  • xImage
  •  

卍解好像有點中二,但應該很好懂吧~死神應該算是很紅才對/images/emoticon/emoticon16.gif

先決條件

不使用標準 XCom 後端的原因

原因一:傳遞資料大小限制

預設 XCom 有空間限制,取決於 Metadata 資料庫,不同 DB 也有不同限制:

  • SQLite:2GB
  • Postgres:1 GB
  • MySQL:64 KB

Metadata 資料庫也稱作元資料庫,所有可以在 Airflow Web UI 上看到的資料,基本上都記錄在裡面,包含 Dag/Task名稱、每次執行紀錄和時間、任務間傳遞資料、儲存的連結和變數

理論上,在 Airflow 中不應用來傳遞大型數據,但...如果就是必須要在 task 之間傳遞超過 2GB 的資料呢?自訂 XCom(Custom XCom Backend) 就是你的好選擇,透過繼承原來的 XCom 類別,讓 task 之間傳遞的資料量沒有限制。

以我們要示範自訂 XCom 的 AWS S3 為例:

原先的 BaseXCom

A task => BaseXCom => B task

用自訂的類別繼承 BaseXCom

A task => class CustomXComBackend(BaseXCom) => B task
  • CustomXComBackend 當中,也可以依照不同資料類型傳遞到不同的暫存空間,等到下一個 task 要使用時在將資料拿回來。

原因二:資料類型限制

使用標準 XCom 後端的第二個限制是只能將資料序列化(serialize)成 JSON 格式,在傳遞上會比較麻煩,使用自訂的類別後,可以依照不同需求傳遞不同資料型態。

補充:Airflow version 2.6 之後,標準 XCom 後端也可以直接傳遞 pandas 的 DataFrame

Time to Get Your Hands Dirty

專案結構

{Airflow Project Root}
├── docker-compose.yaml
├── configs/
├── dags/
├── logs/
└── plugins/
    └── include/
        └── custom_xcom_backend.py

docker-compose.yaml 設定

  • docker-compose.yaml 模板可以執行下方程式取得
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'

來源:Airflow 官方文件

  • 只需要加一行 AIRFLOW__CORE__XCOM_BACKENDenvironment 區塊就可以了
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.1}
  # build: .
  environment:
    ...
    AIRFLOW__CORE__XCOM_BACKEND=include.custom_xcom_backend.CustomXComBackend
    ...

自訂類別: include/custom_xcom_backend.py

  • astronomer 有針對 custom backend 部分做很完整的說明,下方的範例程式也是參考自Set up a custom XCom backend using object storage by astronomer再作修改,如果有詳細看 astronomer 提供的範例會發現,astronomer 有針對不同資料類型存成不同資料,例如 pandas.DataFrame 會被存成 csv 再傳到 AWS S3。
  • 下方程式碼只有單純將資料上傳到 s3,原先轉換成 JSON 的部分也保留,實務上這個檔案也會依照不同需求而傳到不同的中繼站:Google 的 GCS 或是 Azure Blob Storage 等等。

類別程式碼範例

from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import json
import uuid
import os

class CustomXComBackend(BaseXCom):
    PREFIX = "xcom_s3://"
    BUCKET_NAME = "ithome-it30-demo" #YOUR_S3_BUCKET_NAME

    @staticmethod
    def serialize_value(
        value,
        key=None,
        task_id=None,
        dag_id=None,
        run_id=None,
        map_index= None,
        **kwargs
    ):
        hook = S3Hook(aws_conn_id="aws_conn") #YOUR_AWS_CONN_ID
        filename = "data_" + str(uuid.uuid4()) + ".csv"
        s3_key = f"xcom/dag_id={dag_id}/run_id={run_id}/task_id={task_id}/{filename}"
        with open(filename, 'a+') as f:
            json.dump(value, f)
        hook.load_file(
            filename=filename,
            key=s3_key,
            bucket_name=CustomXComBackend.BUCKET_NAME,
            replace=True
        )
        os.remove(filename)
        reference_string = CustomXComBackend.PREFIX + s3_key
        return BaseXCom.serialize_value(value=reference_string)

    @staticmethod
    def deserialize_value(result):
        result = BaseXCom.deserialize_value(result)
        hook = S3Hook(aws_conn_id="aws_conn")
        key = result.replace(CustomXComBackend.PREFIX, "")
        filename = hook.download_file(
            key=key,
            bucket_name=CustomXComBackend.BUCKET_NAME,
            local_path="/tmp"
        )
        with open(filename, 'r') as f:
            output = json.load(f)
        os.remove(filename)
        return output
  • 可以先從整體的類別架構去看,有兩個 function,分別是 serialize_value()deserialize_value(),功能可以很直白的從命名知道:
    • serialize_value():將 task 傳出的資料序列化並上傳到 AWS S3
    • deserialize_value():從 AWS S3 把資料取下來,並將序列化資料解開
  • 在兩個function 都可以看到 CustomXComBackend.PREFIX,目的是為了在舊有的 xcom 中進行辨識,加上 prefix 前綴之後在 Airflow Web UI 上可以看到資料在 S3 上的路徑 xcom_s3://xcom/dag_id=XXXX/run_id=XXXXXX/task_id=XXXX/data_xxxx-xxxx-xxxx-xxxx-xxxx.json
    https://ithelp.ithome.com.tw/upload/images/20240922/20135427GAY81i21Ns.png
  • 另外會發現兩段程式碼當中,運用 S3Hook 存取 S3 的部分,在前幾天的分享都有提到了,分別是load_filedownload_file,分別對應到上傳和下載
  • 透過這樣的方式,就能把標準 xcom 儲存 task 之間資料傳遞搬到 AWS S3,而原先 metadata 資料庫中的 xcom 就單純紀錄這筆資料在 s3 上的路徑,進而解放了資料傳遞的兩項限制(大小和格式)/images/emoticon/emoticon07.gif

結語

明天開始就要進入 DBT 了,最少會花到 4 天(也可能更多)去介紹,主要是 DBT Core 的基礎,預期會帶到經典案例 Jaffle Shop,大家一起來逛熱壓吐司店吧~

Jaffle = 熱壓吐司 ? /images/emoticon/emoticon19.gif 圖片是很像啦~


上一篇
【Day 7】Airflow 用 Slack 傳送通知 - Webhook 設定與建議
下一篇
【Day 9】數據轉換最悪の世代:DBT 簡介
系列文
我獨自升級:從水管工走向 DataOps30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言