iT邦幫忙

2023 iThome 鐵人賽

DAY 23
0

我們在上一章的時候,Flink 的輸入 source 是使用 Kafka,它很好用,但我們常常面對的卻是各種 OLTP 資料庫。而如果你打開 Flink 官網,會看到在 DataStream Connectors 底下有 JDBC 的存在:

https://ithelp.ithome.com.tw/upload/images/20230923/201616256y8e1qEnwD.png

JDBC | Apache Flink

但是當你仔細一看,糟了個糕,裡面只有支援 sink 卻沒有 source。

https://ithelp.ithome.com.tw/upload/images/20230923/20161625vtkDI9Tmea.png

這是因為 OLTP 架構並不是設計給 streaming 這種情境使用的,你唯一能做的只有透過輪詢的方式,不停的去要最新的資料來模擬成 streaming source。

所以,我們要來客製化一個 source了。

public class CustomPostgresSource extends RichParallelSourceFunction<String> {
    private final String jdbcUrl;
    private final String username;
    private final String password;
    private final long pollingInterval;
    private transient Connection connection;
    private transient PreparedStatement preparedStatement;
    private transient ValueState<Long> lastQueryTimeState;

    public CustomPostgresSource(String jdbcUrl, String username, String password, long pollingInterval) {
        this.jdbcUrl = jdbcUrl;
        this.username = username;
        this.password = password;
        this.pollingInterval = pollingInterval;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = DriverManager.getConnection(jdbcUrl, username, password);
        preparedStatement = connection.prepareStatement("SELECT * FROM example_data WHERE last_modified > ?");
        lastQueryTimeState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastQueryTime", Long.class));
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (true) {
            long lastQueryTime = lastQueryTimeState.value() != null ? lastQueryTimeState.value() : 0L;
            preparedStatement.setTimestamp(1, new Timestamp(lastQueryTime));
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                // Emit data to downstream
                String data = resultSet.getString("some_column");
                ctx.collect(data);
            }

            // Update the last query time
            long currentTimestamp = System.currentTimeMillis();
            lastQueryTimeState.update(currentTimestamp);

            // Sleep for pollingInterval milliseconds before the next query
            Thread.sleep(pollingInterval);
        }
    }

    @Override
    public void cancel() {
        try {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            // Handle exception
        }
    }
}

這是一個相對簡單的客製化 source function ,我們自己連線到 postgres 的表格,並每 n 秒查詢一次。同時 SQL 有一個 last_modified 欄位可以做為 where 條件,減少我們查詢的資料量,同時確保跟之前的資料沒有重覆。

這個方案可以提供你一個假 JDBC streaming source,但要注意以下幾點:

  1. 查詢的 last_modified 有正確被更新,並且有 update 到 ValueState 內,同時在 open 時也有被載入回來。這是為了保證 checkpoint/savepoint 能記錄到這個值,並在重啟時能還原到正確的值。
  2. 範例是簡化的,實際上最好加上一次 query 的最大時間範圍,不然在一啟動時就會開始讀 db 內所有資料,很有可能造成嚴重的負荷。
  3. cancel 時要將 connection 關閉,以免 MemoryLeak

以上,你就能自己簡單做出一個自己的 JDBC source 了。

Table API Connector

官方案例中,Table API 倒是有支援 JDBC 的 source。但我在研究的時候卡了不少的關,而且我並不喜歡 Table API,他強制要求開發者要使用他的 SQL 語法,而且 SQL 跟原本 Streaming 的觀念落差較大。儘管官方認為這樣可以將 batch 跟 streaming 的語法統一,但可能等之後再來深入研究吧。

讀者如果願意,也可以試試看使用 Table API 做為 JDBC 的 source 解法。


上一篇
來寫一個 Flink Streaming Job 吧 - Day22
下一篇
Flink 也能寫 Batch - Day24
系列文
用 Airflow & Flink 來開發 ETL 吧30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言