iT邦幫忙

2023 iThome 鐵人賽

DAY 22
0
AI & Data

Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇系列 第 22

[Day22] Airflow牙起來-Debug 和 Testing (1)

  • 分享至 

  • xImage
  •  

前言

知道為什麼標題會叫做「牙起來」嗎?真的開發就會知道了/images/emoticon/emoticon01.gifAirflow 的 debug 和 testing 都是比較麻煩的,會越用越抓狂/images/emoticon/emoticon03.gif,這兩天會整理我的方式,如果有更好的方式,拜託跟我說/images/emoticon/emoticon41.gif

依ㄎㄧ媽修~~

今日程式

import pendulum
from datetime import datetime
import requests
from airflow.decorators import dag, task

local_tz = pendulum.timezone("Asia/Taipei") 


@dag(
    dag_id="show_today_activity",
    schedule_interval="0 9 * * *",
    start_date=datetime(2023, 10, 1, tzinfo=local_tz),
    catchup=False
)
def run():
    @task
    def today_activity():
        res = requests.get("https://www.boredapi.com/api/activity")
        return now + ":" + res.text
    today_activity()

run()
  • pendulum 是能夠更方便使用 datetime 模組,這邊是用來處理時區問題
  • catchup=False 👍
  • requests.get({url}) requests 模組可以發送 http requests 然後拿到 response
  • https://www.boredapi.com/api/activity 是我去找的開源 API,就是設計給今天不知道要幹嘛,可以call api,他就會用 json 回傳一個推薦的活動,可以直接點擊網址,每次會不一樣,回傳格式如下:
{
    "activity":"Listen to a new podcast",
    "type":"relaxation",
    "participants":1,
    "price":0.05,
    "link":"",
    "key":"4124860",
    "accessibility":0.12
}

這次推薦我聽一個新的podcast,類別是 relax,還以有一些其他的資訊~

Debug

確認 DAG 和 Task

  • 列出目前所有的 DAGs
airflow dags list
  • 列出 這個 dag id 底下有哪些 task
airflow tasks list {dag id}
  • 加上 --tree 參數,可以看到 tasks 之間更詳細的階層關係(the hierarchy of tasks)
airflow tasks list {dag id} --tree

直接先用 python 執行一遍

python ~/airflow/dags/tutorial.py
  • 確認沒有基本的語法錯誤或是 library 忘記 import
  • 其實用 vscode 的話,裝一下 python extension 就可以達成了😹

Testing

用 airflow tasks test 測試單一任務

  • 進入 airflow webserver docker
  • 在 terminal 執行 airflow tasks test <dag_id> <task_id>
  • 後面可以帶時間 <date(e.g.20230101)>
  • 後面可以帶參數 -t <parameter: json TYPE"{"id":"1"}">
    有哪些參數可以帶,可以看這裡
  • 範例如下:
airflow tasks test <dag_id> <task_id> <date(e.g.20230101)> -t <parameter: json TYPE"{"id":"1"}">

如果想要拿到這些參數,可以這樣 access

def get_json_arg(**kwargs):
    id = kwargs["params"]["id"]

用 airflow dags test 測試整個 DAG

  • 測試每個任務的依賴性
  • 這樣就算是實際執行一次,可以從 Web UI 的 Grid View 上面看到
airflow dags test {dag id} {date}

上一篇
[Day21] 坐上 Airflow 時光機回到過去-Catchup 和 Backfill
下一篇
[Day23] Airflow牙起來-Debug 和 Testing (2)
系列文
Airflow 是什麼? 能吃嗎 ? 數據水管工的超級蘑菇30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言