iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
AI & Data

五年前 IT 路人轉職資料分析,前進資料科學之路系列 第 18

資料工程 - airflow (2) 製作 DAG

  • 分享至 

  • xImage
  •  

今天來試寫一個 DAG 吧,我們來講講 DAG 的架構,這是我從官方文件一路摸索上來的,如果有更好的用法,歡迎交流 ~

DAG 可以分成3大塊 :

  1. dag decorator
    我看過用 with DAG() 跟 DAG() 方式建立的,我目前覺得 dag decorator 最好用
  2. 主要 function 包多個 task decorator
    我看過用 operator、用 sensor 的,目前我覺得用 task decorator 最好用
  3. 呼叫主要 function : 放在最後一行程式碼
import pendulum
import datetime

from airflow.decorators import dag
from airflow.decorators import task

# dag decorator
@dag(
    dag_id = "test_dag",
    schedule = "15 9 * * *",
    start_date = pendulum.datetime(2025, 4, 23, tz = "Asia/Taipei"),
    catchup = False,
    dagrun_timeout = datetime.timedelta(minutes = 30),  # currently set the expiry time for 30 mins
    default_args = {
        "email" : ["youremail@gmail.com"],
        "email_on_failure" : True,
        "email_on_retry" : True,
        "owner": "Livia"
    },
    tags = ["sales_ETL"]
)
def test_dag() :  # 主要 function
    # task decorator
    @task
    def remove_nan_data() :
        return "clean_data done"
    @task
    def change_type():
        return "change_type done"

    # materialization for the task
    remove_nan_data = remove_nan_data()
    change_type = change_type()

    # set the dependency
    remove_nan_data >> change_type

dag = test_dag()

做好 DAG 後,存成 .py 檔,放入你的 airflow 專案資料夾,同 docker-compose.yaml 檔那層有 dags 資料夾,把你的 DAG .py 檔放入這個資料夾後,等你架好的 airflow 偵測到檔案後就會加到介面上可以觀測了。

https://ithelp.ithome.com.tw/upload/images/20251002/20178684BUfivJYwbd.png

參考資料 - airlfow Dags : https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html


上一篇
資料工程 - airflow (1) 自架環境
下一篇
資料工程 - airflow (3) DAG 的切分方式
系列文
五年前 IT 路人轉職資料分析,前進資料科學之路19
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言