程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)
時間窗口 (Time Windows) 是 Apache Flink 中一個重要個概念,在前面幾天的範例中,我們在處理資料時都是直接對流入的資料進行資料轉換,但很多時候我們不只希望處理單筆資料,而是希望處理一個時間段,對時間段內的資料進行聚合、分析與處理。
時間窗口即是用來幫我們將無限數據流劃分為有限的時間段,Flink 提供了多類型的時間窗口,分別是 Tumbling Windows、Sliding Windows、Session Windows 以及 Global Windows。
而為了使用 Time Windows,我們必須為資料源定義水印 (Watermark),前面有介紹過,水印是一個帶有時間戳的特殊事件,用來標明事件時間 (event time) 的順序。
按照窗口大小來切分時間窗口,每個時間窗口之間不會重疊。
我們以 Table API 來做示範,其使用方法如下:
t_env.from_path('my_source')\
.window(Tumble.over(lit(10).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))
每個窗口之間存在一間隔 (小於窗口大小),因此窗口之間會有重疊。
我們以 Table API 來做示範,其使用方法如下:
t_env.from_path('my_source')\
.window(Slide.over(lit(10).seconds).every(lit(5).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))
每個窗口並沒有固定的大小,資料持續流入時窗口保持開啟,當事件間隔大於session gap 時則關閉窗口,並將事件分配到新的窗口。
我們以 Table API 來做示範,其使用方法如下:
t_env.from_path('my_source')\
.window(Session.with_gap(lit(5).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))
將所有事件都分配到同一個窗口中,通常被用在全局性的分析當中,只有搭配 trigger 時才能使用。
而事實上在 Flink table API 中並沒有 Global Window 這個分類,我們可以簡單地使用無窗口的計算來達到 Global Windows 的效果,
確保當前目錄中有 test_data.csv
、window_tumble.py
、window_slide.py
、window_session.py
等檔案
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")
t_env.execute_sql("""
CREATE TABLE my_source (
event_time TIMESTAMP(3),
`value` DOUBLE,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = 'test_data.csv',
'format' = 'csv'
)
""")
t_env.execute_sql("""
CREATE TABLE my_avg (
window_start_time TIMESTAMP(3),
avg_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
t_env.from_path('my_source')\
.window(Tumble.over(lit(10).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
.execute_insert("my_avg")\
.wait()
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Slide
from pyflink.table.expressions import lit, col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")
t_env.execute_sql("""
CREATE TABLE my_source (
event_time TIMESTAMP(3),
`value` DOUBLE,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = 'test_data.csv',
'format' = 'csv'
)
""")
t_env.execute_sql("""
CREATE TABLE my_avg (
window_start_time TIMESTAMP(3),
avg_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
t_env.from_path('my_source')\
.window(Slide.over(lit(10).seconds).every(lit(5).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
.execute_insert("my_avg")\
.wait()
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Session
from pyflink.table.expressions import lit, col
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("execution.rowtime-timestamp-type", "from-source")
t_env.execute_sql("""
CREATE TABLE my_source (
event_time TIMESTAMP(3),
`value` DOUBLE,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'filesystem',
'path' = 'test_data.csv',
'format' = 'csv'
)
""")
t_env.execute_sql("""
CREATE TABLE my_avg (
window_start_time TIMESTAMP(3),
avg_value DOUBLE
) WITH (
'connector' = 'print'
)
""")
t_env.from_path('my_source')\
.window(Session.with_gap(lit(5).seconds).on(col('event_time')).alias('w'))\
.group_by(col("w")) \
.select(col("w").start.alias('window_start_time'), col('value').avg.alias('avg_value'))\
.execute_insert("my_avg")\
.wait()
下一篇文章我們會基於目前所學到的 Flink 內容,來做一個簡單的 Flink 應用。