iT邦幫忙

2023 iThome 鐵人賽

DAY 12
0

上一篇講了一些測試心得,這篇就來講個偏整合性的測試方法吧。

CMD 測試

這是官方提供的方法,用起來也很簡單:

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test demo_dag task_print_date 2015-06-01

# testing sleep
airflow tasks test demo_dag task_sleep 2015-06-01

只要你知道 dag_id (demo_dag) 跟 task id (task_pring_date),就可以指定執行那個 DAG 內的 task,並給予它一個執行日期 (非必要)。

這種方式執行並不會檢查依賴關係,也不會將狀態寫入 database 內,因為它就是測試。但如果這個 task 有要寫入資料的話,還是要注意不要弄髒資料庫。

Pytest

這邊先假設大家都有一定基礎,就先不談為何不用 Unittest,以及 Pytest 如何安裝了,再講下去廢話太多。

DAG 測試

import pytest

from airflow.models import DagBag

@pytest.fixture()
def dagbag():
    return DagBag()

def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

這段測試是載入一個 “hello_world” 的 DAG,然後確認沒有 import 錯誤,並確實存在該 DAG,最後驗證 task 數量為1。

前兩個點都是 ok 的,你寫任何一個 DAG 的測試都應該加上,但要注意兩件事:

  1. dag_id 可以考慮定義成全域變數,而非 fixed 的 string。因為你或許會因為一些原因而更改 dag id,這不應該成為測試錯誤的原因。
  2. task 數量不是必要的檢查項目,請參考上一篇的心得。

客製 operator 測試

import datetime

import pendulum
import pytest

from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"

@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        schedule="@daily",
        start_date=DATA_INTERVAL_START,
    ) as dag:
        MyCustomOperator(
            task_id=TEST_TASK_ID,
            prefix="s3://bucket/some/prefix",
        )
    return dag

def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results. 

這段主要是為了測試定自義的 operator 用的,測試它在 DAG 下是否可以正常運作。

所以首先先建一個 DAG 以及裡面的 task 流程,再透過 create_dagrun 來呼叫 Airflow 執行,而非直接呼叫 operator 的 execute() 。並且在最後檢查狀態是否有變成 SUCCESS

不過這種做法,很接近於元件測試甚至是整合測試了,我是認為不夠單元,但它跟上面的 DAG 測試一樣,算是好寫又基礎的測項,所以寫一寫是無礙的。

Task 依賴

def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    task1 = dag.get_task("task1")
    task2 = dag.get_task("task2")

    assert task1.downstream_task_ids == set("task2")

假設這個 dag 內有兩個 task,分別是 task1 跟 task2,並且 task1 >> task2 ,那可以用 downstream_task_ids 來檢查它們的依賴關係。

但跟 task 數量的檢查一樣,這種測試見仁見智,至少我不愛這樣做。

明天再來講重點的單元測試吧。


上一篇
Airflow 的單元測試 (一) - Day11
下一篇
Airflow 單元測試的注意事項 - Day13
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言