程式碼
這次參賽的程式碼都會放在 Big-Data-Framework-30-days,建議大家直接把整個 repo clone 下來,然後參考 README 進行基本設置,接著直接 cd 到今天的資料夾內。(記得 pull 最新的版本)
前幾天介紹了 PyFlink 的各種使用,今天我們來結合一下前幾天所學的內容,來做一個實時的熱度分析功能。
我們一樣使用之前用過的 MovieLens Dataset,下載之後放到 /data
資料夾內。
$ cd data
$ wget https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
$ unzip ml-latest-small.zip
$ rm ml-latest-small.zip
重要!!!
為了操作方便,請大家先 把movies.csv
與ratings.csv
兩個檔案中的 header 刪掉!(如下)
這邊就不用多提了吧,創建一個 streaming table 的環境。
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
雖說是實時熱度分析,但因為我們的資料來源都是 csv,所以都是直接讀完,實際上可能會透過 kafka connector 來接收即時的使用者行為,這裡我們就做個簡化,直接把 ratings.csv
視作即時資料流,記錄著使用者對電影的實時評分,而 movies.csv
則視為批資料,用來提供電影資訊。
t_env.execute_sql("""
CREATE TABLE ratings (
`userId` INT,
`movieId1` INT,
`rating` DOUBLE,
`timestamp` BIGINT,
`event_time` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
WATERMARK FOR `event_time` AS `event_time`
) WITH (
'connector' = 'filesystem',
'path' = '../../data/ml-latest-small/ratings.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
""")
t_env.execute_sql("""
CREATE TABLE movies (
`movieId2` INT,
`title` STRING,
`genres` STRING
) WITH (
'connector' = 'filesystem',
'path' = '../../data/ml-latest-small/movies.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
""")
這邊為了方便就直接印出結果,大家也可以照著前幾天的作法來儲存檔案或發送到 kafka 主題。
t_env.execute_sql("""
CREATE TABLE output (
`date` TIMESTAMP(3),
`hot_movies` ARRAY<STRING>
) WITH (
'connector' = 'print'
)
""")
我們按照順序來拆解一下程式:
col(a).sum
、col(b).count
,這裡我們計算每個 window 中電影出現的次數,並回傳出現次數最高的 5 的電影。@udaf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def hot_movies(movie_ids):
movie_counts = Counter(movie_ids)
return sorted(movie_counts, key=lambda x: movie_counts[x], reverse=True)[:5]
results = t_env.from_path("ratings") \
.where(col('rating') >= 4.0) \
.join(t_env.from_path("movies"), col('movieId1') == col('movieId2')) \
.window(Slide.over(lit(300).days).every(lit(100).days).on(col('event_time')).alias('w'))\
.group_by(col('w'))\
.select(col('w').end, hot_movies(col('title')))
輸出到 Sink Table,印出結果!
results.execute_insert('output') \
.wait()
結果如下:
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Slide
from pyflink.table import DataTypes
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udaf
from collections import Counter
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
@udaf(result_type=DataTypes.ARRAY(DataTypes.STRING()), func_type="pandas")
def hot_movies(movie_ids):
movie_counts = Counter(movie_ids)
return sorted(movie_counts, key=lambda x: movie_counts[x], reverse=True)[:5]
# source tables
t_env.execute_sql("""
CREATE TABLE ratings (
`userId` INT,
`movieId1` INT,
`rating` DOUBLE,
`timestamp` BIGINT,
`event_time` as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
WATERMARK FOR `event_time` AS `event_time`
) WITH (
'connector' = 'filesystem',
'path' = '../../data/ml-latest-small/ratings.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
""")
t_env.execute_sql("""
CREATE TABLE movies (
`movieId2` INT,
`title` STRING,
`genres` STRING
) WITH (
'connector' = 'filesystem',
'path' = '../../data/ml-latest-small/movies.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
""")
# sink table
t_env.execute_sql("""
CREATE TABLE output (
`date` TIMESTAMP(3),
`hot_movies` ARRAY<STRING>
) WITH (
'connector' = 'print'
)
""")
results = t_env.from_path("ratings") \
.where(col('rating') >= 4.0) \
.join(t_env.from_path("movies"), col('movieId1') == col('movieId2')) \
.window(Slide.over(lit(300).days).every(lit(100).days).on(col('event_time')).alias('w'))\
.group_by(col('w'))\
.select(col('w').end, hot_movies(col('title')))
results.execute_insert('output') \
.wait()
明天就是最後一天了,來分享一下參賽心得!