iT邦幫忙

2023 iThome 鐵人賽

DAY 24
0

雖然我說 Flink 的強項跟目的是處理 Streaming 的資料,但不代表它不能做 batch。最經典的例子,依然還是讀檔後計算字數。

public class BatchJobExample {

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 讀取文件
        String inputFilePath = "path/to/your/input/file.txt";
        DataSource<String> input = env.readTextFile(inputFilePath);

        // 寫一個 Tokenizer 假裝我們有做事
        DataSink<Tuple2<String, Integer>> result = input
            .flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);

        // 寫入文件
        String outputFilePath = "path/to/your/output/file.txt";
        result.writeAsCsv(outputFilePath, "\n", " ");

        // 執行
        env.execute("Batch Job Example");
    }

		public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
		
		    @Override
		    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
		        // 統一大小寫並分割
		        String[] tokens = value.toLowerCase().split("\\W+");
		
		        // 消费二元组
		        for (String token : tokens) {
		            if (token.length() > 0) {
		                out.collect(new Tuple2<String, Integer>(token, 1));
		            }   
		        }
		    }
		}
}

如果你還記得前面一開始的 streaming 範例的話,可能會發現大同小異,最主要的差異是 ExecutionEnvironment env 的不同。在前面我們用的是 StreamExecutionEnvironment ,但這裡卻變成比較簡單的 ExecutionEnvironment

另一個就是 keyBy 變成 groupBy。但是在使用概念是接近的,也就是轉換 → 分組 → 聚合。

但是 Batch 一般來說,我們的定義是上有界有限的,所以不太會需要用到 TimeWindows。至於 CountWindow 或許有機會,但我自己目前是想不到這種例子。

至於用 Batch 讀 JDBC 呢?官方有個已廢棄的 DataSet API,裡面有一些複雜的寫法,有機會可以再說明。但應該不會寫出來,畢竟我也不知道什麼時候會被刪掉,到時候就會失去意義了。

當然聰明的讀者應該都可以參考上一篇的做法,自己寫一個 source,這裡就保留給大家了。


上一篇
Flink Streaming 與 JDBC - Day23
下一篇
Flink 的單元測試 20 - Day25
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言