上一篇講了一些測試心得,這篇就來講個偏整合性的測試方法吧。
這是官方提供的方法,用起來也很簡單:
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test demo_dag task_print_date 2015-06-01
# testing sleep
airflow tasks test demo_dag task_sleep 2015-06-01
只要你知道 dag_id (demo_dag
) 跟 task id (task_pring_date
),就可以指定執行那個 DAG 內的 task,並給予它一個執行日期 (非必要)。
這種方式執行並不會檢查依賴關係,也不會將狀態寫入 database 內,因為它就是測試。但如果這個 task 有要寫入資料的話,還是要注意不要弄髒資料庫。
這邊先假設大家都有一定基礎,就先不談為何不用 Unittest,以及 Pytest 如何安裝了,再講下去廢話太多。
import pytest
from airflow.models import DagBag
@pytest.fixture()
def dagbag():
return DagBag()
def test_dag_loaded(dagbag):
dag = dagbag.get_dag(dag_id="hello_world")
assert dagbag.import_errors == {}
assert dag is not None
assert len(dag.tasks) == 1
這段測試是載入一個 “hello_world” 的 DAG,然後確認沒有 import 錯誤,並確實存在該 DAG,最後驗證 task 數量為1。
前兩個點都是 ok 的,你寫任何一個 DAG 的測試都應該加上,但要注意兩件事:
import datetime
import pendulum
import pytest
from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"
@pytest.fixture()
def dag():
with DAG(
dag_id=TEST_DAG_ID,
schedule="@daily",
start_date=DATA_INTERVAL_START,
) as dag:
MyCustomOperator(
task_id=TEST_TASK_ID,
prefix="s3://bucket/some/prefix",
)
return dag
def test_my_custom_operator_execute_no_trigger(dag):
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DATA_INTERVAL_START,
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
start_date=DATA_INTERVAL_END,
run_type=DagRunType.MANUAL,
)
ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
ti.task = dag.get_task(task_id=TEST_TASK_ID)
ti.run(ignore_ti_state=True)
assert ti.state == TaskInstanceState.SUCCESS
# Assert something related to tasks results.
這段主要是為了測試定自義的 operator 用的,測試它在 DAG 下是否可以正常運作。
所以首先先建一個 DAG 以及裡面的 task 流程,再透過 create_dagrun
來呼叫 Airflow 執行,而非直接呼叫 operator 的 execute()
。並且在最後檢查狀態是否有變成 SUCCESS
不過這種做法,很接近於元件測試甚至是整合測試了,我是認為不夠單元,但它跟上面的 DAG 測試一樣,算是好寫又基礎的測項,所以寫一寫是無礙的。
def test_dag_loaded(dagbag):
dag = dagbag.get_dag(dag_id="hello_world")
task1 = dag.get_task("task1")
task2 = dag.get_task("task2")
assert task1.downstream_task_ids == set("task2")
假設這個 dag 內有兩個 task,分別是 task1 跟 task2,並且 task1 >> task2
,那可以用 downstream_task_ids
來檢查它們的依賴關係。
但跟 task 數量的檢查一樣,這種測試見仁見智,至少我不愛這樣做。
明天再來講重點的單元測試吧。