在一開始學習程式的時候,覺得使用參數是件像是魔法一般的事。同樣類型的工作,編寫一次程式碼之後,就可以透過不同的參數產生不同的結果。如果有不同的維度的功能需要加入,則再增加一些參數的設計便可以應對不同狀況。而在 Airflow DAG 中我們同樣也可以透過參數控制 Data pipeline。
在 first_dag 中我們執行的方始是點擊 Trigger DAG 執行,意思就是不帶參數直接執行 DAG。
如果我們要帶入參數的話,使用圖形介面可以選擇 Trigger DAG w/ config
點擊進去之後可以看到下圖的介面,在這邊輸入想要傳入的參數。這邊需要以 dict 的結構輸入。輸入之後點擊下方藍色的 Trigger 按鈕就可以觸發 DAG 同時帶入所輸入的參數。
在輸入端傳入參數之後,我們要怎麼在 Airflow DAG 的 Task 中取得所接收到的參數呢?在Airflow 版本 2.0 之後,在 PythonOperator 執行時就會帶入相關系統參數,而我們在本來呼叫的 extract 函式中,加上 **kwargs 來取得相關參數, 之後擷取 'dag_run' 裡面的 conf 值,便可以抓到傳入的參數,並針對入的參數進行應用。
def extract(**kwargs):
print(kwargs['dag_run'].conf)
print("extract from data source")
我們打開 Task 的 log 就可以看到 Task 成功執行上面的程式碼,並把所傳入的參數進行顯示。
而在 Airflow2.0 之前的版本,我們需要在想要取得參數資訊 Task 的 PythonOperator 當中加入參數 "provide_context=True",我們拿 extract 這個 Task 當作範例,程式碼可以改寫成:
extract_task = PythonOperator(
task_id="extract", python_callable=extract, dag=dag,provide_context=True,
)
如此讓參數傳入執行的 Function。
由於在Airflow 當中許多參數是常常會使用到的,因此 Airflow 也有一些預設傳入的值,方便我們可以快速的應用。像是 Task 的執行日期 -- ds、Task 執行的時間點 -- ts、Dag_run 的 ID -- run_id等等,我們也都可以透過參數取得。
def extract(**kwargs):
print(kwargs['dag_run'].conf)
print(kwargs['ds'])
print(kwargs['ts'])
print(kwargs['run_id'])
print("extract from data source")
我們可以在 Task 的 log 中看到相關資訊。
如果想知道更多其他的參數可以參考以下官方給的文件:
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
今天就先介紹到這邊了,我們明天繼續!
從 Airflow 2.0 開始已經沒有 provide_context=True
了唷。
感謝大大提點!
有時候不同案子使用不同版本時候會忽略