iT邦幫忙

2023 iThome 鐵人賽

DAY 27
0
AI & Data

30天認識主流大數據框架:Hadoop + Spark + Flink系列 第 27

Day27 - PyFlink Kafka Connector

  • 分享至 

  • xImage
  •  

前言
昨天介紹了 Kafka 的安裝、運行與基本操作,對 Kafka 還不太瞭解的人可以先參考這篇文章:Day26 - Kafka 介紹

程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)

Dependencies

Flink Kafka Connector 需要另外下載依賴才能使用,因為我們是使用 PyFlink,所以先不用管 Maven,直接下載 SQL Client JAR 就好。

Kafka Connector (Table API)

我們以 Table API 做示範,架構跟前幾天提到的基本上一模一樣,幫大家複習一下:

  1. Create TableEnvironment:創建 Table 環境
  2. Create Source Table:創建 Source Table
  3. Create Sink Table:創建 Sink Table
  4. Query:對 Source table 進行 Query
  5. Emit:將結果 Emit 到 Sink table

要注意的地方在有幾點,於創建環境時,我們要設置依賴檔的路徑:

t_env.get_config().get_configuration().set_string("pipeline.jars", "file://<your_dependency_path>")

接著針對 Source Table (與 Sink Table) 進行相關配置:

table = t_env.execute_sql("""
    CREATE TABLE my_source (
        `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
        `partition` BIGINT METADATA VIRTUAL,
        `offset` BIGINT METADATA VIRTUAL,
        `message` STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'ms-to-flink',
        'properties.bootstrap.servers' = 'localhost:9092',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'raw'                            
    )
""")

Metadata

除了消息內容外,我們可以直接選取消息的 MetaData,如事件時間、分區、偏移量等。

Connector Options

  • connector:使用 kafka 為連接器
  • topic (required):訂閱的 kafka 主題
  • properties.bootstrap.servers (required):kafka server 運行的位置
  • format (required):消息的格式,可以設定為 jsoncsv 等,詳細可參考 table formats
  • properties.group.id (optional):指定消費者群組
  • scan.startup.mode (optional):設置 flink 程式在初始化時對於主題中消息的處理模式 (Source Table):
    • latest-offset (default):從主題中最新的消息(偏移量)開始消費
    • earliest-offset:從主題中最早的消息(偏移量)開始消費
    • group-offsets:只有在使用消費者群組時可以使用,不同的消費者可以有不同的消息偏移量。

Example

Prerequisite

  • localhost:9092 上運行 Kafka (假設已初始化過 KRaft 集群)
    $ $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
    
  • 建立 flink-sourceflink-sink 主題
    $KAFKA_HOME/bin/kafka-topics.sh --create --topic flink-source --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    $KAFKA_HOME/bin/kafka-topics.sh --create --topic flink-sink --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    

pyflink_kafka_table.py

下面的程式會讀取 flink-source 這個主題的消息,將消息的 metadata (事件時間、分區、偏移量) 以及轉換為大寫的消息內容傳到 flink-sink 主題中。

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col


# 1. Create TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

## Set Dependency (Download here: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)
t_env.get_config().get_configuration().set_string("pipeline.jars", f"file:///home/mengchiehliu/projects/big_data_30_days/flink-sql-connector-kafka-1.17.1.jar")


# 2. Create Source Table
table = t_env.execute_sql("""
    CREATE TABLE my_source (
        `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
        `partition` BIGINT METADATA VIRTUAL,
        `offset` BIGINT METADATA VIRTUAL,
        `message` STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'ms-to-flink',
        'properties.bootstrap.servers' = 'localhost:9092',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'raw'                            
    )
""")
table = t_env.from_path("my_source")


# 3. Create Sink Table
t_env.execute_sql("""
    CREATE TABLE my_sink (
        event_time TIMESTAMP(3),
        kafka_partition BIGINT,
        kafka_offset BIGINT, 
        message STRING
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'ms-from-flink',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'csv'
    )
""")
                  
# 4. Query
sink_table = table.select(col('event_time'), col('partition'), col('offset'), col('message').upper_case)

# 5. Emit
sink_table.execute_insert("my_sink").wait()

Produce Events、Consume Events

開兩個新的 terminal,分別運行下面程式:

  • produce events
    $ $KAFKA_HOME/bin/kafka-console-producer.sh --topic flink-source --bootstrap-server localhost:9092
    
  • consume events
    $KAFKA_HOME/bin/kafka-console-consumer.sh --topic flink-sink --bootstrap-server localhost:9092
    

發佈消息到 flink-source 主題後,可以從 flink-sink 中取得處理後的消息:

https://ithelp.ithome.com.tw/upload/images/20231012/20138939E5OaDXICXU.png

https://ithelp.ithome.com.tw/upload/images/20231012/20138939rrrqVhYIHP.png

預告

明天會介紹 Flink 中 窗口 (Window) 的概念。


上一篇
Day26 - Kafka 介紹
下一篇
Day28 - PyFlink (4):Time Windows
系列文
30天認識主流大數據框架:Hadoop + Spark + Flink30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言