前言
昨天介紹了 Kafka 的安裝、運行與基本操作,對 Kafka 還不太瞭解的人可以先參考這篇文章:Day26 - Kafka 介紹。
程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)
Flink Kafka Connector 需要另外下載依賴才能使用,因為我們是使用 PyFlink,所以先不用管 Maven,直接下載 SQL Client JAR 就好。
我們以 Table API 做示範,架構跟前幾天提到的基本上一模一樣,幫大家複習一下:
要注意的地方在有幾點,於創建環境時,我們要設置依賴檔的路徑:
t_env.get_config().get_configuration().set_string("pipeline.jars", "file://<your_dependency_path>")
接著針對 Source Table (與 Sink Table) 進行相關配置:
table = t_env.execute_sql("""
CREATE TABLE my_source (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`message` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'ms-to-flink',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'raw'
)
""")
除了消息內容外,我們可以直接選取消息的 MetaData,如事件時間、分區、偏移量等。
json
、csv
等,詳細可參考 table formats
localhost:9092
上運行 Kafka (假設已初始化過 KRaft 集群)
$ $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
flink-source
、flink-sink
主題
$KAFKA_HOME/bin/kafka-topics.sh --create --topic flink-source --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
$KAFKA_HOME/bin/kafka-topics.sh --create --topic flink-sink --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
下面的程式會讀取 flink-source
這個主題的消息,將消息的 metadata (事件時間、分區、偏移量) 以及轉換為大寫的消息內容傳到 flink-sink
主題中。
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
# 1. Create TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
## Set Dependency (Download here: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/)
t_env.get_config().get_configuration().set_string("pipeline.jars", f"file:///home/mengchiehliu/projects/big_data_30_days/flink-sql-connector-kafka-1.17.1.jar")
# 2. Create Source Table
table = t_env.execute_sql("""
CREATE TABLE my_source (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`message` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'ms-to-flink',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'raw'
)
""")
table = t_env.from_path("my_source")
# 3. Create Sink Table
t_env.execute_sql("""
CREATE TABLE my_sink (
event_time TIMESTAMP(3),
kafka_partition BIGINT,
kafka_offset BIGINT,
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'ms-from-flink',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'csv'
)
""")
# 4. Query
sink_table = table.select(col('event_time'), col('partition'), col('offset'), col('message').upper_case)
# 5. Emit
sink_table.execute_insert("my_sink").wait()
開兩個新的 terminal,分別運行下面程式:
$ $KAFKA_HOME/bin/kafka-console-producer.sh --topic flink-source --bootstrap-server localhost:9092
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic flink-sink --bootstrap-server localhost:9092
發佈消息到 flink-source
主題後,可以從 flink-sink
中取得處理後的消息:
明天會介紹 Flink 中 窗口 (Window) 的概念。