今天這段其實不是開發 Airflow 必要的部份,甚至會多增加一點麻煩。但我個人還是比較喜歡這種寫法,所以來寫一些。
前面有提到 PythonOperator 是 Airflow 內最泛用又彈性的 operator,但是它寫起來大概會像這樣:
def process_data():
#...do something
result = get_result()
push_result_to_xcom(result)
def load_data():
data = get_data_from_xcom()
#...do something
pass
task1 = PythonOperator(
task_id='process_data_task',
python_callable=process_data,
provide_context=True,
dag=dag,
)
task2 = PythonOperator(
task_id='process_data_task',
python_callable=load_data,
provide_context=True,
dag=dag,
)
task1 >> task2
這種寫法某種程度上,不是很 python,如果不熟悉 Airflow 框架的人可能一時間看不懂。而且資料的關係被隱藏在 XCom 裡 (假設兩個 task 需要傳遞),看起來也有一點累贅。所以 Airflow 後來出了新的 feature 叫 TaskFlow,可以解決一部份問題。
@task
def process_data():
#...do something
result = get_result()
return result
@task
def load_data(data):
#...do something
pass
data = process_data()
load_data(data)
第一段的程式作用可以完全被新的第二段給取代掉,看起來是不是乾淨,易讀許多?你要做的就是加上 @task
這個裝飾器,剩下的 Airflow 會處理掉,包含 xcom 的資料存取。
語法糖當然還是有代價的,首先它只能取代掉 PythonOperator,其他的 operator 還是要建立 instance。而且 @task
跟傳統的寫法雖然可以共存,但會增加指定關係的複雜度。我們上面的例子如果在中間要多經過一個 PostgresOperator 的話,寫起來會像這樣:
@task
def process_data():
#...do something
result = get_result()
return result
@task
def load_data(data):
result = get_from_xcom()
#...do something
pass
create_table_task = PostgresOperator(
task_id='create_table',
sql='CREATE TABLE IF NOT EXISTS my_table (id serial PRIMARY KEY, data text);',
postgres_conn_id='my_postgres_conn', # 使用之前配置的連接
dag=dag,
)
data = process_data()
data >> create_table_task >> load_data()
經過傳統的 operator 時,還是要使用 >> 符號來表示關係,下游則是依然要從 XCom 取值。如果對程式 style 一致性很注重的人,可能要注意一下。
function
這項見仁見智,但我必須先提醒一下。
加上 @task
裝飾之後,在測試裡要呼叫 function 時,不能直接丟參數,或是存取 return value
# This test function will fail
def test_process_data():
result = process_data()
assert result == 1
這個測項是會錯誤的,不是因為 result 不是 1 ,而是因為 result 其實已經被包裝進 XCom 物件裡。也是因為這樣,你要傳參數進去的時候也必須包裝進 XCom 物件裡。
解決方法第一個當然就照他的格式,但我個人會用第二種:
# This test function will pass
def test_process_data():
result = process_data.function()
assert result == 1
透過 .function()
,你就可以照一般的 python function 使用,無論是 input 參數或是 output 的 return value,都可以直接正常使用,如此一來在寫測試的時候會輕鬆許多。
除了一開始提到的部份,還有一些優點。有一些參數在傳統寫法內,你要從 context 內取出 (參考 Templates reference — Airflow Documentation (apache.org)),如此一來在程式內你就不得不將 context 做為一個參數,進而增加測試要 mock 掉的物件。
但改用 TaskFlow 的話,你可以寫成這樣:
# This test function will pass
def process_data(data_interval_start: None, data_interval_end:None):
get_data_in_time_range(data_interval_start, data_interval_end)
原本 data_interval_start 是要從 context 取得的,或是要辛苦一點在 op_karge 內用 template 定義並帶入 {{data_interval_start}}
。但用 TaskFlow 就可以再簡化這段,而且不用知道 context 這個上下文內容。
而且理所當然的,都已經盡量跟 airflow 的框架解耦合了,你的單元測試會更好寫
.function
來呼叫真實function。