雖然我說 Flink 的強項跟目的是處理 Streaming 的資料,但不代表它不能做 batch。最經典的例子,依然還是讀檔後計算字數。
public class BatchJobExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 讀取文件
String inputFilePath = "path/to/your/input/file.txt";
DataSource<String> input = env.readTextFile(inputFilePath);
// 寫一個 Tokenizer 假裝我們有做事
DataSink<Tuple2<String, Integer>> result = input
.flatMap(new Tokenizer())
.groupBy(0)
.sum(1);
// 寫入文件
String outputFilePath = "path/to/your/output/file.txt";
result.writeAsCsv(outputFilePath, "\n", " ");
// 執行
env.execute("Batch Job Example");
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 統一大小寫並分割
String[] tokens = value.toLowerCase().split("\\W+");
// 消费二元组
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
如果你還記得前面一開始的 streaming 範例的話,可能會發現大同小異,最主要的差異是 ExecutionEnvironment
env 的不同。在前面我們用的是 StreamExecutionEnvironment
,但這裡卻變成比較簡單的 ExecutionEnvironment
。
另一個就是 keyBy 變成 groupBy。但是在使用概念是接近的,也就是轉換 → 分組 → 聚合。
但是 Batch 一般來說,我們的定義是上有界有限的,所以不太會需要用到 TimeWindows。至於 CountWindow 或許有機會,但我自己目前是想不到這種例子。
至於用 Batch 讀 JDBC 呢?官方有個已廢棄的 DataSet API,裡面有一些複雜的寫法,有機會可以再說明。但應該不會寫出來,畢竟我也不知道什麼時候會被刪掉,到時候就會失去意義了。
當然聰明的讀者應該都可以參考上一篇的做法,自己寫一個 source,這裡就保留給大家了。