如果前面的鐵人賽每篇你都看過的話(希望你有),可能會發現我常常提到「測試」,或是說「為了測試」。測試是非常重要的,但這裡我就先不細談,有興趣的人歡迎看看我朋友 Kuma 出的書:你就是不寫測試才會沒時間:Kuma 的單元測試實戰 -- Java篇(iThome鐵人賽系列書) | 天瓏網路書店 (tenlong.com.tw),裡面解釋得非常詳細。
但我必須說,Airflow 並不是很方便測試的一個框架,我之後要談的 Flink 也是如此。這是因為這些框架並不直接執行你寫出來的 code,而是先經過一層轉譯。有一部份原因是為了擴充性跟 pipeline,當你的業務量增加,單台機器總有一天會遇到效能瓶頸,無論是數量或是單一 ETL 的資料量。所以這些框架都會以分佈式架構的概念去開發,當單台主機跑不動整個 DAG 時,還能將各個 task 丟到不同主機去執行。
不然你以為 XCom 那麼麻煩是為什麼?為什麼每個 Operator 都要繼承 BaseOperator ?
回到正題,Airflow 雖然難測試,但還是做得到的,只是你可能要好好思考你的測試項目。
要測 DAG 能不能正常被載入?可以,很簡單。
要測單一 Task 執行結果?可以,很簡單。
要測 Task 相依性?可以,很簡單。
但這些是你最重要的業務邏輯嗎?
我們都在說 ETL ,那麼 extract 資料跟 load 資料要測嗎?測試 DAG 時要 mock 到資料 I/O 嗎?
就我認為,大多數情況下其實是不用去測試資料存取的,除非你的 ETL 單純到就是從 A 資料庫複製到 B 資料庫去,這種需求的業務邏輯就只是「複製」而已,你唯一能做的就是確定資料中間沒有被改掉。這些存取大多是透過第三方套件,即使是你自己寫的,那也不是 dag 該測的內容。
大多數的 ETL 是會包含清洗的,而這個清洗就是你的商業邏輯所在。
舉例來說,假如我們有一個 ETL 是從 A 表聚合每月的用戶資料後,再存入另外一張 B 表。那麼你的聚合邏輯是需要測試的,而不必去管中間經過了幾個 task,每個 task 的依賴關係如何。最終我們在意的就只是 A 表的資料,正確的壓縮進了 B 表內。
做法有很多種,也許 A B 兩張表同在一個資料庫,你可以很輕鬆的下個 insert select
語法,一個 task 就完成。也許你會在第一個 task 下 select sum(xxx) from A
取得聚合後資料後,再寫第二個 task 將這些資料存進 B 表。又或許你就在一個 task 內將所有 A 表資料載下來後,再用 pandas 做 filter、group by 自己清洗資料。
別誤會,我並不反對白箱測試,也認同有時候需要對單一 method 測試。但是如果測試項目太細,當你的業務邏輯不變,純粹只是想調整寫法,或是重構架構時,你的測試很有可能要被迫改變。
在寫測試前,請先思考好你這個 ETL 要完成的核心目的是什麼,如果可以,TDD 會是一個很好的模式去強迫你自己這樣做,但即使你不熟 TDD,或是想不到如何測試業務邏輯,你還是要將現有的程式好好寫測試。
爛的測試好過不寫測試。