iT邦幫忙

2023 iThome 鐵人賽

DAY 29
0
AI & Data

30天認識主流大數據框架:Hadoop + Spark + Flink系列 第 29

Day29 - PyFlink (5):實時熱度分析

  • 分享至 

  • xImage
  •  

程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)

前幾天介紹了 PyFlink 的各種使用,今天我們來結合一下前幾天所學的內容,來做一個實時的熱度分析功能。

資料準備

我們一樣使用之前用過的 MovieLens Dataset,下載之後放到 /data 資料夾內。

$ cd data
$ wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
$ unzip ml-latest-small.zip
$ rm ml-latest-small.zip

重要!!!
為了操作方便,請大家先 movies.csvratings.csv 兩個檔案中的 header 刪掉!(如下)

https://ithelp.ithome.com.tw/upload/images/20231014/20138939bMmAoiMRdq.png

實時熱度分析

1. Create TableEnvironment

這邊就不用多提了吧,創建一個 streaming table 的環境。

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

2. Create Source Table

雖說是實時熱度分析,但因為我們的資料來源都是 csv,所以都是直接讀完,實際上可能會透過 kafka connector 來接收即時的使用者行為,這裡我們就做個簡化,直接把 ratings.csv 視作即時資料流,記錄著使用者對電影的實時評分,而 movies.csv 則視為批資料,用來提供電影資訊。

t_env.execute_sql("""
    CREATE TABLE ratings (
        `userId` INT,
        `movieId1` INT,
        `rating` DOUBLE,
        `timestamp` BIGINT,
        `event_time` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
        WATERMARK FOR `event_time` AS `event_time`
    ) WITH (
        'connector' = 'filesystem',
        'path' = '../../data/ml-latest-small/ratings.csv',
        'format' = 'csv',
        'csv.ignore-parse-errors' = 'true'                             
    )
""")

t_env.execute_sql("""
    CREATE TABLE movies (
        `movieId2` INT,
        `title` STRING,
        `genres` STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = '../../data/ml-latest-small/movies.csv',
        'format' = 'csv',
        'csv.ignore-parse-errors' = 'true'    
    )
""")

3. Create Sink Table

這邊為了方便就直接印出結果,大家也可以照著前幾天的作法來儲存檔案或發送到 kafka 主題。

t_env.execute_sql("""
    CREATE TABLE output (
        `date` TIMESTAMP(3),
        `hot_movies` ARRAY<STRING>
    ) WITH (
        'connector' = 'print'                       
    )
""")

4. Query

我們按照順序來拆解一下程式:

  • where:篩選出好評 (rating > 4)
  • join:與電影資訊建立關聯
  • window:使用 sliding window 來劃分時間段,這裡的 window size 跟 window gap 都設的非常大,那是因為資料本身很稀疏,實際上實時熱度分析的 window 會小很多,可能就落在幾分鐘、幾秒鐘。
  • group_by:透過 window 來分群
  • select:這裡我們用到了 UDAF (User-Defined Aggregate Functions),顧名思義是將欄位進行聚合的一個自定義功能,概念有點像 col(a).sumcol(b).count,這裡我們計算每個 window 中電影出現的次數,並回傳出現次數最高的 5 的電影。
@udaf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def hot_movies(movie_ids):
    movie_counts = Counter(movie_ids)
    return sorted(movie_counts, key=lambda x: movie_counts[x], reverse=True)[:5]

results = t_env.from_path("ratings") \
    .where(col('rating') >= 4.0) \
    .join(t_env.from_path("movies"), col('movieId1') == col('movieId2')) \
    .window(Slide.over(lit(300).days).every(lit(100).days).on(col('event_time')).alias('w'))\
    .group_by(col('w'))\
    .select(col('w').end, hot_movies(col('title')))

5. Emit

輸出到 Sink Table,印出結果!

results.execute_insert('output') \
    .wait()

結果如下:
https://ithelp.ithome.com.tw/upload/images/20231014/20138939EqneRuYcf0.png

完整程式碼

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Slide
from pyflink.table import DataTypes
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udaf

from collections import Counter

env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)

@udaf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def hot_movies(movie_ids):
    movie_counts = Counter(movie_ids)
    return sorted(movie_counts, key=lambda x: movie_counts[x], reverse=True)[:5]

# source tables
t_env.execute_sql("""
    CREATE TABLE ratings (
        `userId` INT,
        `movieId1` INT,
        `rating` DOUBLE,
        `timestamp` BIGINT,
        `event_time` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
        WATERMARK FOR `event_time` AS `event_time`
    ) WITH (
        'connector' = 'filesystem',
        'path' = '../../data/ml-latest-small/ratings.csv',
        'format' = 'csv',
        'csv.ignore-parse-errors' = 'true'                             
    )
""")

t_env.execute_sql("""
    CREATE TABLE movies (
        `movieId2` INT,
        `title` STRING,
        `genres` STRING
    ) WITH (
        'connector' = 'filesystem',
        'path' = '../../data/ml-latest-small/movies.csv',
        'format' = 'csv',
        'csv.ignore-parse-errors' = 'true'    
    )
""")

# sink table
t_env.execute_sql("""
    CREATE TABLE output (
        `date` TIMESTAMP(3),
        `hot_movies` ARRAY<STRING>
    ) WITH (
        'connector' = 'print'                       
    )
""")


results = t_env.from_path("ratings") \
    .where(col('rating') >= 4.0) \
    .join(t_env.from_path("movies"), col('movieId1') == col('movieId2')) \
    .window(Slide.over(lit(300).days).every(lit(100).days).on(col('event_time')).alias('w'))\
    .group_by(col('w'))\
    .select(col('w').end, hot_movies(col('title')))

results.execute_insert('output') \
    .wait()

預告

明天就是最後一天了,來分享一下參賽心得!


上一篇
Day28 - PyFlink (4):Time Windows
下一篇
Day30 - 完結灑花🎉🎉🎉
系列文
30天認識主流大數據框架:Hadoop + Spark + Flink30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言