卍解好像有點中二,但應該很好懂吧~死神應該算是很紅才對
預設 XCom 有空間限制,取決於 Metadata 資料庫,不同 DB 也有不同限制:
Metadata 資料庫也稱作元資料庫,所有可以在 Airflow Web UI 上看到的資料,基本上都記錄在裡面,包含 Dag/Task名稱、每次執行紀錄和時間、任務間傳遞資料、儲存的連結和變數
理論上,在 Airflow 中不應用來傳遞大型數據,但...如果就是必須要在 task 之間傳遞超過 2GB 的資料呢?自訂 XCom(Custom XCom Backend) 就是你的好選擇,透過繼承原來的 XCom 類別,讓 task 之間傳遞的資料量沒有限制。
以我們要示範自訂 XCom 的 AWS S3 為例:
A task => BaseXCom => B task
A task => class CustomXComBackend(BaseXCom) => B task
CustomXComBackend 當中,也可以依照不同資料類型傳遞到不同的暫存空間,等到下一個 task 要使用時在將資料拿回來。使用標準 XCom 後端的第二個限制是只能將資料序列化(serialize)成 JSON 格式,在傳遞上會比較麻煩,使用自訂的類別後,可以依照不同需求傳遞不同資料型態。
補充:Airflow version 2.6 之後,標準 XCom 後端也可以直接傳遞 pandas 的 DataFrame
{Airflow Project Root}
├── docker-compose.yaml
├── configs/
├── dags/
├── logs/
└── plugins/
    └── include/
        └── custom_xcom_backend.py
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
來源:Airflow 官方文件
AIRFLOW__CORE__XCOM_BACKEND 在 environment 區塊就可以了x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.1}
  # build: .
  environment:
    ...
    AIRFLOW__CORE__XCOM_BACKEND=include.custom_xcom_backend.CustomXComBackend
    ...
include/custom_xcom_backend.pyfrom airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import json
import uuid
import os
class CustomXComBackend(BaseXCom):
    PREFIX = "xcom_s3://"
    BUCKET_NAME = "ithome-it30-demo" #YOUR_S3_BUCKET_NAME
    @staticmethod
    def serialize_value(
        value,
        key=None,
        task_id=None,
        dag_id=None,
        run_id=None,
        map_index= None,
        **kwargs
    ):
        hook = S3Hook(aws_conn_id="aws_conn") #YOUR_AWS_CONN_ID
        filename = "data_" + str(uuid.uuid4()) + ".csv"
        s3_key = f"xcom/dag_id={dag_id}/run_id={run_id}/task_id={task_id}/{filename}"
        with open(filename, 'a+') as f:
            json.dump(value, f)
        hook.load_file(
            filename=filename,
            key=s3_key,
            bucket_name=CustomXComBackend.BUCKET_NAME,
            replace=True
        )
        os.remove(filename)
        reference_string = CustomXComBackend.PREFIX + s3_key
        return BaseXCom.serialize_value(value=reference_string)
    @staticmethod
    def deserialize_value(result):
        result = BaseXCom.deserialize_value(result)
        hook = S3Hook(aws_conn_id="aws_conn")
        key = result.replace(CustomXComBackend.PREFIX, "")
        filename = hook.download_file(
            key=key,
            bucket_name=CustomXComBackend.BUCKET_NAME,
            local_path="/tmp"
        )
        with open(filename, 'r') as f:
            output = json.load(f)
        os.remove(filename)
        return output
serialize_value() 和 deserialize_value(),功能可以很直白的從命名知道:
serialize_value():將 task 傳出的資料序列化並上傳到 AWS S3deserialize_value():從 AWS S3 把資料取下來,並將序列化資料解開CustomXComBackend.PREFIX,目的是為了在舊有的 xcom 中進行辨識,加上 prefix 前綴之後在 Airflow Web UI 上可以看到資料在 S3 上的路徑 xcom_s3://xcom/dag_id=XXXX/run_id=XXXXXX/task_id=XXXX/data_xxxx-xxxx-xxxx-xxxx-xxxx.json 
load_file 和 download_file,分別對應到上傳和下載 
明天開始就要進入 DBT 了,最少會花到 4 天(也可能更多)去介紹,主要是 DBT Core 的基礎,預期會帶到經典案例 Jaffle Shop,大家一起來逛熱壓吐司店吧~
Jaffle = 熱壓吐司 ?
圖片是很像啦~