iT邦幫忙

1

Python & Airflow 學習筆記_建立簡易 Dag

  • 分享至 

  • xImage
  •  

這邊記錄了該如何建立一個簡易的 Dag,如果有錯誤或更好的寫法,歡迎留言討論

一、Dag 定義

就筆者從官方文件的理解,每個 Dag 可以代表是一個要執行的任務,而每個 Dag 裡面會有許多個 Operator 可以進行各個流程該做的事情,簡單來講可以理解成 Task 底下有多個 Sub Task,而這些 Sub Task 串連起來就變成一個 Dag

二、利用 with 建立 Dag

(一)、import 套件

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

在 airflow 當中提供許多種 Operator 給我們進行各種功能操作,詳細種類的 Operator 可以參考 這個網址 本文主要使用 EmptyOperator 以及 BashOperator 來進行範例撰寫

(二)、EmptyOperator 以及 BashOperator

  1. EmptyOperator:顧名思義,就是一個空的流程,常用來製作開頭 or 結束
  2. BashOperator:可以協助我們對電腦發出 Linux 指令
  3. 補充:每個 Operator 都會有 task_id 屬性,此屬性可以指定該流程的名稱https://ithelp.ithome.com.tw/upload/images/20220809/20144024dpYaPorocl.png
start_task = EmptyOperator(task_id="start_task")
end_task = EmptyOperator(task_id="end_task")
first_task = BashOperator(task_id="first_task",
                          bash_command=f"echo execute time: {datetime.now()}")

(三)、利用 with 建立 Dag

按照如下圖的範例,我們可以建立一個名為 first_dag 的任務,並且指派 tags 屬性,方便我們在 UI 上進行查詢

dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())
with dag:
    start_task = EmptyOperator(task_id="start_task")
    end_task = EmptyOperator(task_id="end_task")
    first_task = BashOperator(task_id="first_task",
                              bash_command=f"echo execute time: {datetime.now()}")

下圖為透過上述程式碼建立出的 Dag,如果是使用官方的 docker-compose.yaml 建立環境的話,會出現許多範例 Dag可以善用 tags 來進行過濾
注意,此時還沒有將任務流程進行串聯
https://ithelp.ithome.com.tw/upload/images/20220809/20144024HDAbQYf19d.jpg

(四)、串聯 Operator

我們可以透過 >> 符號來將各個 Operator 進行串聯,下方為完整程式碼,串聯完成後,即可回到 UI 介面進行重新整理,後面的步驟會教學如何運行任務以及查看 Log

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

dag = DAG(dag_id="first_dag", tags=['user'], start_date=datetime.today())
with dag:
    start_task = EmptyOperator(task_id="start_task")
    end_task = EmptyOperator(task_id="end_task")
    first_task = BashOperator(task_id="first_task",
                              bash_command=f"echo execute time: {datetime.now()}")

    start_task >> first_task >> end_task

三、執行 Dag 及查看 Log

(一)、執行 Dag

  1. 透過 UI 將 Dag 設定為 unpause 狀態
  2. 點選右方三角形的執行符號,選擇 Trigger DAG 選項
    https://ithelp.ithome.com.tw/upload/images/20220809/20144024xLKa9My0Vf.jpg
  3. 之後就可以在 Runs 欄位看到該 Dag 的運行狀態,從左至右分別為 queued、success、running 以及 failed,而上面的數字代表著數量
    https://ithelp.ithome.com.tw/upload/images/20220809/20144024GwwwG9hYLJ.jpg

(二)、查看 Log

  1. 在 home 頁面點選 Dag 可以查看該 Dag 的詳細狀況
  2. 左下角紅色框框部分會列出串聯在該 Dag 內的 Operator,按照上面的程式碼依序為 start_task > first_task > end_task
    https://ithelp.ithome.com.tw/upload/images/20220809/201440244YlkN277S6.jpg
  3. 沿著該圖表往右方看,可以看到一些有顏色的正方形方塊,成功為綠色,失敗則為紅色
  4. 點選該方塊後右手邊會出現詳細資料,可以在上方發現 Log 選項,點選即可查看 該 Operator 當次的執行紀錄的 Log
    https://ithelp.ithome.com.tw/upload/images/20220809/20144024FDhmhFqJo9.jpg
  5. 按照上面的範例程式,我們會讓該 Operator 在系統中印出 execute time: {datetime.now()}
    https://ithelp.ithome.com.tw/upload/images/20220809/20144024GLEuhjcYOQ.jpg

四、使用 decorator 建立 Dag

在 airflow 2.x 版後,提供了利用 decorator 的方式來建立 Dag,下方為簡易範例
特別注意:
使用此方法建立 Dag 後,需要於外部將該 Dag 的函式丟給一個變數,airflow 才可以抓到該 Dag

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

@dag(tags=['user'], start_date=datetime.today())
def simple_dag():
    start_task = EmptyOperator(task_id="start_task")
    end_task = EmptyOperator(task_id="end_task")
    first_task = BashOperator(task_id="first_task",
                              bash_command=f"echo execute time: {datetime.now()}")

    start_task >> first_task >> end_task


my_dag = simple_dag()

圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言