前言
昨天介紹了 Flink 的安裝與設置,還沒安裝 Flink 的人可以先看看這篇:Day23 - Flink 安裝與設置。
程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。
$ pip install apache-flink
DataStream 的程式架構大致上由5個部分組成,分別是:
DataStream API 是由創建執行環境開始的。
STREAMING
,另外還有BATCH
(批處理)、AUTOMATIC
(根據資料的有界性自動辨識)。from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.execution_mode import RuntimeExecutionMode
# 創建環境
env = StreamExecutionEnvironment.get_execution_environment()
# 設置運行模式
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# 設置環境平行化
env.set_parallelism(2)
輸入資料來源 (創建 DataStream 物件),要注意的是 DataStream 是一個不可修改的物件,一旦創建就不能新增或刪除內部的元素。一共有三種方法能夠創建 DataStream 物件,分別是:
From Collection
通常用於測試階段,使用 from_collection
取得 list 中的元素
ds = env.from_collection([(1, "hadoop"), (2, "spark"), (3, "flink")])
DataStream Connectors
Flink 支援許多 Connectors 用來取得 DataStream,最常見的是消息佇列 (如:kafka、RabbitMQ),另外 FileSystem (如:HDFS)、DataBase (MongoDB)、FileFormat (如:CSV、JSON) 也都有相對應的 Connector,大部分 connectors 需要下載對應的 jars
,另外 Flink 也提供 Data Generator:
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.common.watermark_strategy import WatermarkStrategy
my_source = NumberSequenceSource(1, 10)
ds = env.from_source(
source=my_source,
source_name='my_source',
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps())
Table & SQL Connectors (推薦用法)
我們也可以使用更高階的 API 來建立 Connectors,概念跟 DataStream Connectors 差不多,但改成用 DDL (Data Definition Language) 寫 。
from pyflink.table import StreamTableEnvironment
from pyflink.common.typeinfo import Types
# create table environment
t_env = StreamTableEnvironment.create(env)
# DDL
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
# data source
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
對數據流進行轉換,包括 filter
、reduce
、window
、join
等操作,我們以 From Collection 的 DataStream 來示範,將英文字母轉換為大寫。
transformed_ds = ds.map(lambda x: (x[0], x[1].upper()))
輸出結果,跟 Source 一樣有多種 Connectors 可以選擇 (如:FS、Kafka 等),這邊示範最簡單的印出結果 (STDOUT)。
transformed_ds = ds.print()
執行環境。
env.execute("[Optional] My Job")
大家可以自己執行看看有什麼差異~
From Collection:example1.py
from pyflink.datastream import StreamExecutionEnvironment
# 1. Environment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. DataStream
ds = env.from_collection([(1, "hadoop"), (2, "spark"), (3, "flink")])
# 3. Transform
transformed_ds = ds.map(lambda x: (x[0], x[1].upper()))
# 4. Sink
transformed_ds.print()
# 5. Execute
env.execute("Example 1")
DataStream Connectors:example2.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.common.watermark_strategy import WatermarkStrategy
# 1. Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(3)
# 2. DataStream
my_source = NumberSequenceSource(1, 10)
ds = env.from_source(
source=my_source,
source_name='my_source',
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps())
# 3. Transform
transformed_ds = ds.map(lambda x: (x, x * 10))
# 4. Sink
transformed_ds.print()
# 5. Execute
env.execute("Example 2")
Table & SQL Connectors:example3.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment
from pyflink.common.typeinfo import Types
# 1. Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
t_env = StreamTableEnvironment.create(env)
# 2. DataStream
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
# 3. Transform
transformed_ds = ds.map(lambda x: (x[0], x[1][:10]))
# 4. Sink
transformed_ds.print()
# 5. Execute
env.execute("Example 3")
明天要介紹的是 PyFlink 的 Table APIs。