2020 年 12 月發布的 Apache Airflow 2.0,其中很重要的新功能就是 Taskflow API
,可以讓我們更簡潔的撰寫 Airflow DAG,降低程式碼複雜度、減少行數。(相信是 Apache 聽到使用者的心聲了
在任務之間傳遞資料,不需要使用 ti.xcom_pull
和 ti.xcom_push
了,可直接用 Python 常用的 return 傳遞資料。
不需要用with DAG()
把 Operator 包起來定義 task, task 定義可以和 python function直接寫一起,每次都要重複兩次是真的煩。
只有提供的 Decorators
可以用,如果要使用的 Operator 沒提供,其他還是要用原來方式撰寫,或是只能把 Taskflow 和舊方法 Mix 再一起寫
@dag
: DAG 裝飾器 => 取代 with DAG()@task
: python task 裝飾器 => 取代 PythonOperator@task_group
: 昨天學的 task group 也有更方便的寫法@task.virtualenv
: python 虛擬環境@task.docker
: docker 環境今天要做的範例是假設拿到一筆 json
格式的訂單資料,要利用訂單金額
和訂單數量
計算出平均的訂單金額
,有故意把簡單的邏輯分成三個 tasks
,再次實作昨天的 TaskGroup
,接著運用 Taskflow API
的 task
、task_group
和 dag
,看能夠縮短多少行。
import json
from datetime import datetime
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.python_operator import PythonOperator
def extract(ti): #用json模組讀取資料
json_string = """
[
{
"order_id": "1001",
"order_item": "薯餅蛋餅",
"order_price": 45
},
{
"order_id": "1002",
"order_item": "大冰奶",
"order_price": 35
}
]
"""
order_data = json.loads(json_string)
ti.xcom_push(key='order_data', value=order_data)
def transform_sum(ti): #計算薯餅蛋餅和大冰奶的總金額
order_data_list = ti.xcom_pull(task_ids='extract', key='order_data')
order_total = 0
for order_dict in order_data_list:
order_total += order_dict['order_price']
ti.xcom_push(key='order_total', value=order_total)
def transform_count(ti): #json裡面有多少數量
order_data_list = ti.xcom_pull(task_ids='extract', key='order_data')
order_count = len(order_data_list)
ti.xcom_push(key='order_count', value=order_count)
def transform_averge(ti): #總金額/數量=平均金額
order_total = ti.xcom_pull(
task_ids='transform.transform_sum', key='order_total')
order_count = ti.xcom_pull(
task_ids='transform.transform_count', key='order_count')
order_average = order_total/order_count
ti.xcom_push(key='order_averge', value=order_average)
def load(ti): #將平均金額print出來
order_averge = ti.xcom_pull(
task_ids='transform.transform_averge', key='order_averge')
print(f"Average Order Price: {order_averge}")
with DAG(
dag_id='traditional_etl_dag',
schedule_interval=None,
start_date=datetime(2023, 9, 27)
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract
)
with TaskGroup(group_id='transform') as transform:
transform_sum = PythonOperator(
task_id='transform_sum',
python_callable=transform_sum
)
transform_count = PythonOperator(
task_id='transform_count',
python_callable=transform_count
)
transform_averge = PythonOperator(
task_id='transform_averge',
python_callable=transform_averge
)
[transform_sum, transform_count] >> transform_averge
load = PythonOperator(
task_id='load',
python_callable=load
)
extract >> transform >> load
注意在使用
TaskGroup
的時候,傳進裡面Xcoms
的task id
會變成group_id.task_id
,因為TaskGroup
能夠在不同的Group
也可以用相同的task_id
,所以前面要加上group_id
來辨識,如果覺得這樣很麻煩,可以在TaskGroup
的設定加上prefix_group_id=False
,就能維持原來的task_id
,記得這時候的task_id
就不能有重複了。
以上面舉例要改成:
with TaskGroup(group_id='transform',prefix_group_id=False) as transform_data:
這樣在
ti.xcom_pull
的時候 前面就不用加transform
了
哇~真的是落落長,整個 DAG 的流程如下:
import json
from airflow.decorators import dag, task, task_group
from datetime import datetime
@dag(schedule_interval=None, start_date=datetime(2023, 9, 27))
def taskflow_etl_dag():
@task()
def extract():
json_string = """
[
{
"order_id": "1001",
"order_item": "薯餅蛋餅",
"order_price": 45
},
{
"order_id": "1002",
"order_item": "大冰奶",
"order_price": 35
}
]
"""
order_data = json.loads(json_string)
return order_data
@task_group
def transform(order_data):
@task()
def transform_sum(order_data_json):
order_total = 0
for order_dict in order_data_json:
order_total += order_dict['order_price']
return order_total
@task()
def transform_count(order_data_json):
order_count = len(order_data_json)
return order_count
@task()
def transform_averge(order_total, order_count):
order_average = order_total/order_count
return order_average
order_average_result = transform_averge(
transform_sum(order_data), transform_count(order_data))
return order_average_result
@task()
def load(order_average):
print(f"Average Order Price: {order_average}")
load(transform(extract()))
taskflow_etl_dag()
>>
的方式設定相依性,會比用function
return
來說更方便今天就到這邊吧~明天就會去開始用 docker 了,應該會先從 docker 基礎開始介紹,接著說明 docker-compose ,最後才會安裝 airflow 的 docker,這個部分也是會 step by step 每個步驟都會截圖。
更新:docker要到後天了,突然發現 Taskflow API
好像沒有講 code 細節,明天還是再細講一下。