感覺 pytest 的前世今生不太重要,已經存在很久了,有寫 python 的應該也都很熟,重點是看看怎麼應用在 Data Pipeline 當中
{project root}
├── dags/
├── dbt/
├── plugins/
│ ├── include/
│ ├── operators/
│ └── hooks/
├── tests/
│ ├── conftest.py
│ ├── dags/
│ ├── data/
│ ├── operators/
│ └── hooks/
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
tests
會是獨立一個資料夾,其中會再細分針對 dags/
或是其他 plugins
做測試,通常會分一個 data
的資料夾,主要是負責測試 data quality 的部分
import pytest
from airflow.models import DagBag
from airflow.utils.dates import days_ago
from airflow.utils.state import State
@pytest.fixture
def dagbag():
return DagBag()
def test_dag_loaded(dagbag):
dag = dagbag.get_dag(dag_id="your_dag_id")
assert dagbag.import_errors == {}
assert dag is not None
def test_dag_structure(dagbag):
dag = dagbag.get_dag(dag_id="your_dag_id")
task_ids = [task.task_id for task in dag.tasks]
assert "task_1" in task_ids
assert "task_2" in task_ids
def test_task_dependencies(dagbag):
dag = dagbag.get_dag(dag_id="your_dag_id")
task_1 = dag.get_task("task_1")
task_2 = dag.get_task("task_2")
assert task_2.upstream_task_ids == {"task_1"}
def test_execute_dag(dagbag):
dag = dagbag.get_dag(dag_id="your_dag_id")
execution_date = days_ago(1)
dag.clear(start_date=execution_date, end_date=execution_date)
dag_run = dag.create_dagrun(
run_id=f"test_execute_dag_{execution_date.isoformat()}",
execution_date=execution_date,
start_date=execution_date,
state=State.RUNNING,
external_trigger=False,
)
dag_run.run()
assert dag_run.state == State.SUCCESS
最基本的會有這幾種測試
test_dag_loaded
測載入是否正常test_dag_structure
測試 task 是否存在test_task_dependencies
測試 task 之間的上下游依賴關係test_execute_dag
會測試 DAG 運行並檢查其執行狀態@pytest.fixture
如果直翻是「夾具」,其實就是預先定義好的內容,後續就省下許多重複的程式碼,通常也會直接寫在conftest.py
來創建一個 DagBag 對象,這樣我們可以在所有測試中重複使用它。
conftest.py
conftest.py
是 pytest 中的一個特殊文件,用於共享 fixtures、hooks 和其他設定 ,其中定義的東西可以跨多個測試文件,方便重複使用,另外可以針對 scope
設定作用的範圍,例如scope=session
的話就只有當下創建才有,scope=function
就是每個測試函數都會創一個新的 fixture 實例,透過conftest.py
可以集中管理共享的 fixtures(夾具?)
,維護起來比較方便,而且放在裡面之後也不用另外 import,讚
# conftest.py
import pytest
from airflow.models import DagBag
from airflow.models import Connection
from airflow.utils.db import provide_session
@pytest.fixture(scope="session")
def dagbag():
return DagBag()
@pytest.fixture(scope="function")
@provide_session
def mock_connection(session=None):
conn = Connection(
conn_id="test_conn",
conn_type="postgres",
host="localhost",
login="airflow",
password="airflow",
schema="airflow"
)
session.add(conn)
session.commit()
...
mock_connection
範例就是在裡面直接訂一個模擬的 db 連接~
還不知道 great expectations 的話,歡迎回到昨天的【Day 26】Data Pipeline 測試 - Data Quality feat. Great Expectations
import pytest
from great_expectations.data_context import DataContext
from great_expectations.core.batch import BatchRequest
@pytest.fixture(scope="module")
def ge_context():
return DataContext()
def test_dbt_model_output(ge_context):
#指定驗證的相關資料
batch_request = BatchRequest(
datasource_name="dbt_datasource",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="my_dbt_table",
batch_identifiers={"default_identifier_name": "default_identifier"}
)
# 指定要使用的預期檢查
expectation_suite_name = "my_dbt_table_suite"
# 取得檢查
validator = ge_context.get_validator(
batch_request=batch_request, expectation_suite_name=expectation_suite_name
)
# 檢查和確認結果
results = validator.validate()
assert results["success"] is True, "數據未通過 Great Expectations 驗證"
ge_context()
DataContext 是 Great Expectations 的核心,負責管理整個驗證過程所需的設定
創 BatchRequest,指定要驗證的數據集,包含了數據來源、連接器、後續生成的表my_dbt_table
名稱。
my_dbt_table_suite
是預先定義好的 Data Quality 檢查規則,定義了您對數據的預期,例如欄位範圍、要是唯一值等等。
validator
(驗證器)使用指定的Expectation Suite 驗證檢查規則
對數據進行驗證。validate() 方法執行實際的驗證過程,return 一個包含結果的字典格式。
最後的斷言檢查驗證是否成功。如果失敗,測試將拋出一個帶有錯誤消息的異常。