iT邦幫忙

2023 iThome 鐵人賽

DAY 25
0

跟 Airflow 一樣,這種框架都不太好測,而 Flink 的流式處理以及他的複雜機制更是如此。

不過,我們還是可以從一些簡單的部份做起。

Source / Sink

如果你用內建的 Source 跟 sink,我不建議你特別測試它們。

反之,如果你有自己客製化 source 如上次講的 JDBC Streaming source 的話,倒是可以用一般的測試手法處理,這裡我就不多談了。

但是,我們在設計程式的時候,應該要將 source 跟 sink 設計成可注入的方式。這樣後續在做整合測試的時候會方便許多。
我是建議不要都寫在 main() 裡面,而是將 source 跟 sink 在 main() 裡面初始化後,建立一個 job 的實例並做為它的 parameters。

Operator

這個是很基本卻也很重要的部份。大多數狀況,Flink 內建的 operator 不見得能符合我們使用。像是前面範例用的 flatMap,裡面將一行文字拆解後再輸出。

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // 依空格拆分,生成 (key, 1) 
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

像這種元件,基本上會希望全部都有做到測試覆蓋。邏輯通常並不難測,因為 operator 可以的話,希望是獨立、簡單為主。太複雜的機制可能會需要用額外的狀態來保護。

以下是一個簡單的例子

public class TokenizerTest {

    private Tokenizer tokenizer;

    @Before
    public void setUp() {
        tokenizer = new Tokenizer();
    }

    @Test
    public void testTokenizer() {
        // 創建模擬的 Collector
        Collector<Tuple2<String, Integer>> collector = Mockito.mock(Collector.class);

        // 調用 flatMap 方法,這裡可以設定模擬的行為
        tokenizer.flatMap("Hello World", collector);

        // 在這裡添加斷言來檢查模擬的行為是否符合預期
        verify(collector, times(2)).collect(Mockito.any(Tuple2.class));

        // 如果有更具體的斷言,可以使用 ArgumentCaptor 來捕獲模擬方法的參數
        ArgumentCaptor<Tuple2<String, Integer>> argumentCaptor = ArgumentCaptor.forClass(Tuple2.class);
        verify(collector, times(2)).collect(argumentCaptor.capture());

        // 現在你可以檢查捕獲的參數是否符合預期
        // 例如,你可以檢查第一次 collect 的輸出
        Tuple2<String, Integer> firstCapture = argumentCaptor.getAllValues().get(0);
        assertEquals("Hello", firstCapture.f0);
        assertEquals(Integer.valueOf(1), firstCapture.f1);

    }
}

上一篇
Flink 也能寫 Batch - Day24
下一篇
Flink Service 與 jar 的關係 - Day26
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言