iT邦幫忙

2023 iThome 鐵人賽

DAY 28
0
AI & Data

30天認識主流大數據框架:Hadoop + Spark + Flink系列 第 28

Day28 - PyFlink (4):Time Windows

  • 分享至 

  • xImage
  •  

程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)

時間窗口

時間窗口 (Time Windows) 是 Apache Flink 中一個重要個概念,在前面幾天的範例中,我們在處理資料時都是直接對流入的資料進行資料轉換,但很多時候我們不只希望處理單筆資料,而是希望處理一個時間段,對時間段內的資料進行聚合、分析與處理。

時間窗口即是用來幫我們將無限數據流劃分為有限的時間段,Flink 提供了多類型的時間窗口,分別是 Tumbling WindowsSliding WindowsSession Windows 以及 Global Windows

而為了使用 Time Windows,我們必須為資料源定義水印 (Watermark),前面有介紹過,水印是一個帶有時間戳的特殊事件,用來標明事件時間 (event time) 的順序。

窗口類型

Tumbling Windows

按照窗口大小來切分時間窗口,每個時間窗口之間不會重疊。

https://ithelp.ithome.com.tw/upload/images/20231013/20138939FzPteo7xSr.png

我們以 Table API 來做示範,其使用方法如下:

t_env.from_path('my_source')\
    .window(Tumble.over(lit(10).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))

Sliding Windows

每個窗口之間存在一間隔 (小於窗口大小),因此窗口之間會有重疊。
https://ithelp.ithome.com.tw/upload/images/20231013/20138939xkBR2hRqaH.png

我們以 Table API 來做示範,其使用方法如下:

t_env.from_path('my_source')\
    .window(Slide.over(lit(10).seconds).every(lit(5).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))

Session Windows

每個窗口並沒有固定的大小,資料持續流入時窗口保持開啟,當事件間隔大於session gap 時則關閉窗口,並將事件分配到新的窗口。
https://ithelp.ithome.com.tw/upload/images/20231013/20138939qJe2JlMurS.png

我們以 Table API 來做示範,其使用方法如下:

t_env.from_path('my_source')\
    .window(Session.with_gap(lit(5).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))

Global Windows

將所有事件都分配到同一個窗口中,通常被用在全局性的分析當中,只有搭配 trigger 時才能使用。

https://ithelp.ithome.com.tw/upload/images/20231013/201389395c7b98Wd1W.png

而事實上在 Flink table API 中並沒有 Global Window 這個分類,我們可以簡單地使用無窗口的計算來達到 Global Windows 的效果,

Example

Prerequisite

確保當前目錄中有 test_data.csvwindow_tumble.pywindow_slide.pywindow_session.py 等檔案

window_tumble.py

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col


env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")

t_env.execute_sql("""
    CREATE TABLE my_source (
        event_time TIMESTAMP(3),
        `value` DOUBLE,
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'test_data.csv',
        'format' = 'csv'
    )
""")

t_env.execute_sql("""
    CREATE TABLE my_avg (
        window_start_time TIMESTAMP(3),
        avg_value DOUBLE
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.from_path('my_source')\
    .window(Tumble.over(lit(10).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
    .execute_insert("my_avg")\
    .wait()
  • 執行結果
    我們把 window size 設為 10 秒,因此可以看到每個 time window 之間的間隔都是 10 秒
    https://ithelp.ithome.com.tw/upload/images/20231013/20138939ZME97wM71u.png

window_sliding.py

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Slide
from pyflink.table.expressions import lit, col

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")

t_env.execute_sql("""
    CREATE TABLE my_source (
        event_time TIMESTAMP(3),
        `value` DOUBLE,
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'test_data.csv',
        'format' = 'csv'
    )
""")

t_env.execute_sql("""
    CREATE TABLE my_avg (
        window_start_time TIMESTAMP(3),
        avg_value DOUBLE
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.from_path('my_source')\
    .window(Slide.over(lit(10).seconds).every(lit(5).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
    .execute_insert("my_avg")\
    .wait()
  • 執行結果
    我們把 window size 設為 10 秒、sliding gpa 設為 5 秒,因此可以看到第一個時間窗口的結果跟 tumbling window 的結果一樣,但從第二個時間窗口開始,其間隔變為 5 秒。
    https://ithelp.ithome.com.tw/upload/images/20231013/20138939v7z9a55Zz1.png

window_session.py

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Session
from pyflink.table.expressions import lit, col

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")

t_env.execute_sql("""
    CREATE TABLE my_source (
        event_time TIMESTAMP(3),
        `value` DOUBLE,
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'test_data.csv',
        'format' = 'csv'
    )
""")

t_env.execute_sql("""
    CREATE TABLE my_avg (
        window_start_time TIMESTAMP(3),
        avg_value DOUBLE
    ) WITH (
        'connector' = 'print'
    )
""")

t_env.from_path('my_source')\
    .window(Session.with_gap(lit(5).seconds).on(col('event_time')).alias('w'))\
    .group_by(col("w")) \
    .select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
    .execute_insert("my_avg")\
    .wait()
  • 執行結果
    我們把 seesion gap 設為 5 秒,因此可以看到窗口之間的間隔並不一定但都大於 5 秒。
    https://ithelp.ithome.com.tw/upload/images/20231013/20138939gi4yA5frOj.png

預告

下一篇文章我們會基於目前所學到的 Flink 內容,來做一個簡單的 Flink 應用。

參考資料

Windows | Apache Flink


上一篇
Day27 - PyFlink Kafka Connector
下一篇
Day29 - PyFlink (5):實時熱度分析
系列文
30天認識主流大數據框架:Hadoop + Spark + Flink30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言