卍解好像有點中二,但應該很好懂吧~死神應該算是很紅才對
預設 XCom 有空間限制,取決於 Metadata 資料庫,不同 DB 也有不同限制:
Metadata 資料庫也稱作元資料庫,所有可以在 Airflow Web UI 上看到的資料,基本上都記錄在裡面,包含 Dag/Task名稱、每次執行紀錄和時間、任務間傳遞資料、儲存的連結和變數
理論上,在 Airflow 中不應用來傳遞大型數據,但...如果就是必須要在 task 之間傳遞超過 2GB 的資料呢?自訂 XCom(Custom XCom Backend) 就是你的好選擇,透過繼承原來的 XCom 類別,讓 task 之間傳遞的資料量沒有限制。
以我們要示範自訂 XCom 的 AWS S3 為例:
A task => BaseXCom => B task
A task => class CustomXComBackend(BaseXCom) => B task
CustomXComBackend
當中,也可以依照不同資料類型傳遞到不同的暫存空間,等到下一個 task 要使用時在將資料拿回來。使用標準 XCom 後端的第二個限制是只能將資料序列化(serialize)成 JSON 格式,在傳遞上會比較麻煩,使用自訂的類別後,可以依照不同需求傳遞不同資料型態。
補充:Airflow version 2.6 之後,標準 XCom 後端也可以直接傳遞 pandas 的 DataFrame
{Airflow Project Root}
├── docker-compose.yaml
├── configs/
├── dags/
├── logs/
└── plugins/
└── include/
└── custom_xcom_backend.py
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
來源:Airflow 官方文件
AIRFLOW__CORE__XCOM_BACKEND
在 environment
區塊就可以了x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.1}
# build: .
environment:
...
AIRFLOW__CORE__XCOM_BACKEND=include.custom_xcom_backend.CustomXComBackend
...
include/custom_xcom_backend.py
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import json
import uuid
import os
class CustomXComBackend(BaseXCom):
PREFIX = "xcom_s3://"
BUCKET_NAME = "ithome-it30-demo" #YOUR_S3_BUCKET_NAME
@staticmethod
def serialize_value(
value,
key=None,
task_id=None,
dag_id=None,
run_id=None,
map_index= None,
**kwargs
):
hook = S3Hook(aws_conn_id="aws_conn") #YOUR_AWS_CONN_ID
filename = "data_" + str(uuid.uuid4()) + ".csv"
s3_key = f"xcom/dag_id={dag_id}/run_id={run_id}/task_id={task_id}/{filename}"
with open(filename, 'a+') as f:
json.dump(value, f)
hook.load_file(
filename=filename,
key=s3_key,
bucket_name=CustomXComBackend.BUCKET_NAME,
replace=True
)
os.remove(filename)
reference_string = CustomXComBackend.PREFIX + s3_key
return BaseXCom.serialize_value(value=reference_string)
@staticmethod
def deserialize_value(result):
result = BaseXCom.deserialize_value(result)
hook = S3Hook(aws_conn_id="aws_conn")
key = result.replace(CustomXComBackend.PREFIX, "")
filename = hook.download_file(
key=key,
bucket_name=CustomXComBackend.BUCKET_NAME,
local_path="/tmp"
)
with open(filename, 'r') as f:
output = json.load(f)
os.remove(filename)
return output
serialize_value()
和 deserialize_value()
,功能可以很直白的從命名知道:
serialize_value()
:將 task 傳出的資料序列化並上傳到 AWS S3deserialize_value()
:從 AWS S3 把資料取下來,並將序列化資料解開CustomXComBackend.PREFIX
,目的是為了在舊有的 xcom 中進行辨識,加上 prefix 前綴之後在 Airflow Web UI 上可以看到資料在 S3 上的路徑 xcom_s3://xcom/dag_id=XXXX/run_id=XXXXXX/task_id=XXXX/data_xxxx-xxxx-xxxx-xxxx-xxxx.json
load_file
和 download_file
,分別對應到上傳和下載明天開始就要進入 DBT 了,最少會花到 4 天(也可能更多)去介紹,主要是 DBT Core 的基礎,預期會帶到經典案例 Jaffle Shop,大家一起來逛熱壓吐司店吧~
Jaffle = 熱壓吐司 ? 圖片是很像啦~