在上一篇文章裡有提到 PythonOperator 可能是 Airflow 裡最重要的 operator 之一,那當然要花點時間來說明一下使用方法,不過其實很簡單,大都是通用的原則。
# 定義一個Python函數,這將是我們要運行的任務
def my_python_function():
print("Hello, Airflow!")
# 使用PythonOperator來執行我們的Python函數
task = PythonOperator(
task_id='my_task',
python_callable=my_python_function,
dag=dag,
)
透過 python_callable
可以指定要執行的 function reference,然後就可以執行各種彈性的事了。當然如果想用 lambda 式的寫法也沒問題 python_callable=lambda x: print("Hello, Airflow!")
,就效果來說是等同於上面的寫法的。
想必接下來會有人問,要怎麼代入參數。畢竟在現實生活中,很多時候我們要做的工作都是有條件的。
op_kwargs
我們稍微改寫一下範例的 operator
task = PythonOperator(
task_id='my_task',
python_callable=lambda x: print(x),
op_kwargs={"x": "Hello, Airflow"},
dag=dag,
)
Bingo! 這樣子我們的 task 就能 print 參數了。我知道你下一句話會說:「這樣子還是等於寫死的啊?」
Airflow 也有提供一些方便的元件,但我們晚點再討論那些,先來說參數化的好處。由於 PythonOperator 通常會指向另外一個 function,我們便可以也應該對那個 function 做測試。事實上由於 Dag 整個是由 Operator 的實例組成的,如果想測試的話需要有一些特殊的前置作業跟配合 Airflow Framework 的寫法。但對某個 function 做單元測試就不用這麼麻煩,無論是用 Unittest 或是 Pytest,大都照基本寫法就好。
Variables
實務上我們經常需要區分開發環境、測試環境、真實環境;最簡單的例子,就是在本機開發 ETL 時,你不會想也不應該去連到 production 資料庫,進而污染了資料。
同理,有一些參數也是依賴於環境的,或許在測試環境你只想洗出前三大的熱門商品,但在正式環境你需要 20 個。Airflow 提供了 Variables 的元件來讓我們做到這件事。
再用一次同樣的例子:
def my_python_function(x1):
x2 = Variable.get("DEMO_X2", "Default Value")
print(f"{x1}, {x2}!")
task = PythonOperator(
task_id='my_task',
python_callable=my_python_function,
op_kwargs={
"x1": {{var.value.DEMO_X1}}
},
dag=dag,
)
這裡展示了兩種寫法,第一種是在 Python Code 裡 import Variable,便可以透過它來取得 value,並設定 default 值。第二種則是在 op_kwargs 的 {{}} 寫法,用 var.value 開頭,Airflow 便會自己去找出對應的 key 並代入 value。
要注意的事,官方並不建議在 top-level 使用 Variable.get,這會對效能造成負擔。建議要寫在 def 內。