iT邦幫忙

2023 iThome 鐵人賽

DAY 16
0

前面提到 Synchronous Features 和 Asynchronous Features 的差異,換句話說也就是 Realtime 和 Batch Features 的差異,為了區分 Spark RDD 來實現 Near Realtime 和 Flink 完全使用 Streaming 的架構,這一篇文章我會用 Streaming 來表示 Realtime Features,今天要來講一下為什麼一個特徵我在 Experiment 時和我在 Serving 時,會有計算上的差異

Two Streaming Join

首先如果今天是一些不用複雜的 Aggregation 的特徵,如"用戶最後的登入時間"這一類的特徵就比較不會有差異,可以用 Two Streaming Join 來做聚合,下面是一個 Two Streaming Join 在 Flink 中的範例:

如果一個用戶的提款 ip 位置突然從美國變成了一個高風險國家,並且該用戶最近沒有瀏覽過任何與旅行相關的網站,那麼這筆提款就有很高的可能是一個ATO 案例

  1. 交易數據流(Transaction Stream):包含了每一筆提款的詳細信息,例如提款時間, 金額, ip 位置等。
  2. 用戶行為數據流(User Behavior Stream):包含了用戶近期的瀏覽歷史、購買行為、登入時間等。
DataStream<Transaction> transactionStream = ...
DataStream<Customer> customerStream = ...

DataStream<JoinedData> joinedStream = transactionStream
    .join(customerStream)
    .where(new TransactionKeySelector())
    .equalTo(new CustomerKeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(30)))
    .apply (new JoinFunction());

State Management

事實上上面的 Join 本身是透過 State Management 來達到,也就是說 Flink 內部有一個 Internal State 來幫你計算 user id 為 k, 相關資訊為 v 的 KV Storage,並幫你管理了這個 k-v instance 的生命週期 (多久後 expired),這一個包裝再塗上範例中的 .window() 內被實踐,你也可以透過 Redis 來作為 external state 做到一樣的效果

Asynchronous Mindset

上述 Flink 程式碼雖然看起來很像 SQL 的 Join,但其實他帶出了一個很重要的觀念就是 Asynchronous

  • 在 SQL Query 中,當我要一張 table 當 base,然後讓其他 table 計算出來的特徵 Left Join 到我的 base table 時,事實上兩張 table 都是 Ready的狀態,也就是擁有某一個間點前所有的資料
  • 但是在 Streaming 中,資料會一直進來,當你有一個 base event,需要去查詢另外一個 feature message queue event 所計算出來的特徵時,你不能在當下才去 collect feature message queue event 過去的資訊,而是你非同步的不斷的 collect 不斷流進來的資訊,我們用下圖來表示整個概念:

下圖想計算的特徵是"提款時檢查提款的裝置在過去一小時內被使用的次數"
https://ithelp.ithome.com.tw/upload/images/20230920/20161911h6Dav8ILVC.jpg

Batch Feature 時很單純,就是一個 LEFT JOIN
Steaming 時,不管你是用 Flink 內建的 Window Function 或是自己的 External State,在做的事情都是:

  1. 當有 Feature Event 進來的時候, 如圖上 L 圓圈表示 Login, 把需要的資料存到 KV Storage (每個 User 過去窗口內的登入次數)
  2. 當有 Base Event 時, 如圖上 W 點表示 提款 Event,會去查找 KV Storage 拿到 Feature Value (過去窗口內登入次數)
  3. 當超過 Expire Time 時 清除 KV Storage 資料
    這裡就可以先看出 Batch 和 Streaming 兩種方式在 Query 特徵的差異

Counting Features

這時會有一個問題,如果上面我們說的特徵"提款時檢查提款的裝置在過去一小時內被使用的次數"想要改為“提款時檢查用戶提款的裝置在過去一小時內被不同用戶使用的次數"這該怎麼做?

直觀地想,原本的 k 是 Device ID, v 是一個整數的 counting (或是一個 timestamp 更好做到 expire 機制),現在 v 會需要是一個 Pair,包含 user_id 以及 timestamp,這樣才能做到如 sql 中 Distinct user_id 的效果,而這個也可以很好的透過 Redis ZSet 去做實踐

上面的例子就是一個 Two Streaming Join 常見的 Pattern,我自己稱它為 "Counting Feature",這就算是比較複查的 Aggregation (聚合) Features 特徵呢,很快的我們會發現這類特徵隨的 Feature Space 越來越大,很容易會遇到空間瓶頸,這部分可以用到 Cardinality Estimation 相關 Algorithm如 LogLog or HyperLogLog 等演算法來做到近似值,這種方式雖然降低的 Space Complexity 但是因近似值特性會導致 Serving 和 Experiment 的數值有一些差異


上一篇
Day 15 Sync Features
下一篇
Day 17 Async Feature Airflow
系列文
踏上 MLOps 之路:從 Applied Data Scientist 到 MLOps 的轉變與建構30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言