我們在上一章的時候,Flink 的輸入 source 是使用 Kafka,它很好用,但我們常常面對的卻是各種 OLTP 資料庫。而如果你打開 Flink 官網,會看到在 DataStream Connectors 底下有 JDBC 的存在:

但是當你仔細一看,糟了個糕,裡面只有支援 sink 卻沒有 source。

這是因為 OLTP 架構並不是設計給 streaming 這種情境使用的,你唯一能做的只有透過輪詢的方式,不停的去要最新的資料來模擬成 streaming source。
所以,我們要來客製化一個 source了。
public class CustomPostgresSource extends RichParallelSourceFunction<String> {
    private final String jdbcUrl;
    private final String username;
    private final String password;
    private final long pollingInterval;
    private transient Connection connection;
    private transient PreparedStatement preparedStatement;
    private transient ValueState<Long> lastQueryTimeState;
    public CustomPostgresSource(String jdbcUrl, String username, String password, long pollingInterval) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
        this.pollingInterval = pollingInterval;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        preparedStatement = connection.prepareStatement("SELECT * FROM example_data WHERE last_modified > ?");
        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTime", Long.class));
    }
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            long lastQueryTime = lastQueryTimeState.value() != null ? lastQueryTimeState.value() : 0L;
            preparedStatement.setTimestamp(1, new Timestamp(lastQueryTime));
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                // Emit data to downstream
                String data = resultSet.getString("some_column");
                ctx.collect(data);
            }
            // Update the last query time
            long currentTimestamp = System.currentTimeMillis();
            lastQueryTimeState.update(currentTimestamp);
            // Sleep for pollingInterval milliseconds before the next query
            Thread.sleep(pollingInterval);
        }
    }
    @Override
    public void cancel() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            // Handle exception
        }
    }
}
這是一個相對簡單的客製化 source function ,我們自己連線到 postgres 的表格,並每 n 秒查詢一次。同時 SQL 有一個 last_modified 欄位可以做為 where 條件,減少我們查詢的資料量,同時確保跟之前的資料沒有重覆。
這個方案可以提供你一個假 JDBC streaming source,但要注意以下幾點:
last_modified 有正確被更新,並且有 update 到 ValueState 內,同時在 open 時也有被載入回來。這是為了保證 checkpoint/savepoint 能記錄到這個值,並在重啟時能還原到正確的值。cancel 時要將 connection 關閉,以免 MemoryLeak以上,你就能自己簡單做出一個自己的 JDBC source 了。
官方案例中,Table API 倒是有支援 JDBC 的 source。但我在研究的時候卡了不少的關,而且我並不喜歡 Table API,他強制要求開發者要使用他的 SQL 語法,而且 SQL 跟原本 Streaming 的觀念落差較大。儘管官方認為這樣可以將 batch 跟 streaming 的語法統一,但可能等之後再來深入研究吧。
讀者如果願意,也可以試試看使用 Table API 做為 JDBC 的 source 解法。