iT邦幫忙

2023 iThome 鐵人賽

DAY 24
0
AI & Data

30天認識主流大數據框架:Hadoop + Spark + Flink系列 第 24

Day24 - PyFlink (1):DataStream API

  • 分享至 

  • xImage
  •  

前言
昨天介紹了 Flink 的安裝與設置,還沒安裝 Flink 的人可以先看看這篇:Day23 - Flink 安裝與設置

程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。

安裝 PyFlink

$ pip install apache-flink

DataStream API

DataStream 的程式架構大致上由5個部分組成,分別是:

  1. Environment:Obtain an execution environment
  2. Source:Load/create the initial data
  3. Transform:Specify transformations on this data
  4. Sink:Specify where to put the results of your computations
  5. Execute:Trigger the program execution

1. Environment

DataStream API 是由創建執行環境開始的。

  • RuntimeExecutionMode:預設是 STREAMING,另外還有BATCH (批處理)、AUTOMATIC (根據資料的有界性自動辨識)。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.execution_mode import RuntimeExecutionMode

# 創建環境  
env = StreamExecutionEnvironment.get_execution_environment()    
# 設置運行模式
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# 設置環境平行化
env.set_parallelism(2)

2. Source

輸入資料來源 (創建 DataStream 物件),要注意的是 DataStream 是一個不可修改的物件,一旦創建就不能新增或刪除內部的元素。一共有三種方法能夠創建 DataStream 物件,分別是:

  1. From Collection
    通常用於測試階段,使用 from_collection 取得 list 中的元素

    ds = env.from_collection([(1, "hadoop"), (2, "spark"), (3, "flink")])
    
  2. DataStream Connectors
    Flink 支援許多 Connectors 用來取得 DataStream,最常見的是消息佇列 (如:kafka、RabbitMQ),另外 FileSystem (如:HDFS)、DataBase (MongoDB)、FileFormat (如:CSV、JSON) 也都有相對應的 Connector,大部分 connectors 需要下載對應的 jars,另外 Flink 也提供 Data Generator:

    • NumberSequenceSource:序列生成器
    • Watermark:Watermark 字面上翻譯是水印,在 Flink 裡代表一個帶有 timestamp 的特殊事件,用來表示該事件之前的事件都已經到達,用來確保事件時間 (event time) 的順序。
    • WatermarkStrategy:用來定義水印如何生成,可以自己定義,也可以用內建的策略,下面程式中用的就是內建的單調遞增策略。
    from pyflink.datastream.connectors.number_seq import NumberSequenceSource
    from pyflink.common.watermark_strategy import WatermarkStrategy
    
    my_source = NumberSequenceSource(1, 10)
    ds = env.from_source(
        source=my_source, 
        source_name='my_source',
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps())
    
  3. Table & SQL Connectors (推薦用法)
    我們也可以使用更高階的 API 來建立 Connectors,概念跟 DataStream Connectors 差不多,但改成用 DDL (Data Definition Language) 寫 。

    • StreamTableEnvironment:顧名思義是在 DataStream 環境上創建一個 table 環境
    • datagen:隨機生成器,沒有設置 number-of-rows 的話就會是 unbounded data。
    from pyflink.table import StreamTableEnvironment
    from pyflink.common.typeinfo import Types
    
    # create table environment    
    t_env = StreamTableEnvironment.create(env)
    
    # DDL
    t_env.execute_sql("""
            CREATE TABLE my_source (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'number-of-rows' = '10'
            )
        """)
    
    # data source
    ds = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.INT(), Types.STRING()]))
    

3. Transform

對數據流進行轉換,包括 filterreducewindowjoin 等操作,我們以 From Collection 的 DataStream 來示範,將英文字母轉換為大寫。

transformed_ds = ds.map(lambda x: (x[0], x[1].upper()))

4. Sink

輸出結果,跟 Source 一樣有多種 Connectors 可以選擇 (如:FS、Kafka 等),這邊示範最簡單的印出結果 (STDOUT)。

transformed_ds = ds.print()

5. Execute

執行環境。

env.execute("[Optional] My Job")

完整程式

大家可以自己執行看看有什麼差異~

  • From Collectionexample1.py

    from pyflink.datastream import StreamExecutionEnvironment
    
    # 1. Environment
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # 2. DataStream
    ds = env.from_collection([(1, "hadoop"), (2, "spark"), (3, "flink")])
    
    # 3. Transform
    transformed_ds = ds.map(lambda x: (x[0], x[1].upper()))
    
    # 4. Sink
    transformed_ds.print()
    
    # 5. Execute
    env.execute("Example 1")
    
  • DataStream Connectorsexample2.py

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors.number_seq import NumberSequenceSource
    from pyflink.common.watermark_strategy import WatermarkStrategy
    
    # 1. Environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(3)
    
    # 2. DataStream
    my_source = NumberSequenceSource(1, 10)
    ds = env.from_source(
        source=my_source, 
        source_name='my_source',
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps())
    
    # 3. Transform
    transformed_ds = ds.map(lambda x: (x, x * 10))
    
    # 4. Sink
    transformed_ds.print()
    
    # 5. Execute
    env.execute("Example 2")
    
  • Table & SQL Connectorsexample3.py

    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.execution_mode import RuntimeExecutionMode
    from pyflink.table import StreamTableEnvironment
    from pyflink.common.typeinfo import Types
    
    # 1. Environment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    t_env = StreamTableEnvironment.create(env)
    
    # 2. DataStream
    t_env.execute_sql("""
            CREATE TABLE my_source (
              a INT,
              b VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'number-of-rows' = '10'
            )
        """)
    
    ds = t_env.to_append_stream(
        t_env.from_path('my_source'),
        Types.ROW([Types.INT(), Types.STRING()]))
    
    # 3. Transform
    transformed_ds = ds.map(lambda x: (x[0], x[1][:10]))
    
    # 4. Sink
    transformed_ds.print()
    
    # 5. Execute
    env.execute("Example 3")
    

預告

明天要介紹的是 PyFlink 的 Table APIs

參考資料

Apache Flink Documentation


上一篇
Day23 - Flink 安裝與設置
下一篇
Day25 - PyFlink (2):Table API
系列文
30天認識主流大數據框架:Hadoop + Spark + Flink30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言