iT邦幫忙

2025 iThome 鐵人賽

DAY 16
1
AI & Data

「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」系列 第 16

【知其然,更知其所以然】Day16: 固定時間窗口讓聚合更實用

  • 分享至 

  • xImage
  •  

昨天我們打造了強大的 SQL 風格多聚合系統,但有個重要問題:所有聚合都是「全局的」,從程式開始到現在的累積統計。在真實業務中,我們更關心「最近的」、「定期的」統計結果。

今天讓我們為聚合系統加上時間感知能力!

重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。

為什麼需要時間窗口?

全局聚合的局限性

# 目前的全局聚合
customer_stats = orders_df.group_by("customer_id").agg(total_orders="count")

# 結果:從程式開始到現在的累積統計
{
    "group_key": "C001", 
    "total_orders": 152  # 從早上9點到現在下午3點的累積
}

問題

  • 無法知道「最近一小時」的訂單量
  • 無法做「每小時」的定期報告
  • 無法發現「趨勢變化」(上午忙?下午閒?)

時間窗口的價值

# 有了時間窗口的聚合
hourly_stats = (orders_df
    .window(hours(1))           # 每小時一個窗口
    .group_by("customer_id")
    .agg(hourly_orders="count")
)

# 結果:每小時的獨立統計
{
    "group_key": "C001",
    "hourly_orders": 12,        # 這一小時的訂單數
    "window_start": "14:00:00",
    "window_end": "15:00:00"
}

價值

  • 趨勢分析:看出業務的週期性變化
  • 異常檢測:某時段突然暴增或下降
  • 即時決策:基於最新時段的數據調整策略

固定窗口的設計思路

核心概念

固定窗口(Tumbling Window)

  • 時間軸被切分成固定大小、不重疊的窗口
  • 每個事件只屬於一個窗口
  • 窗口到期時輸出最終結果
時間軸:09:00 -----> 10:00 -----> 11:00 -----> 12:00
窗口:  [09:00-10:00) [10:00-11:00) [11:00-12:00)
事件:     事件1,2,3      事件4,5       事件6
結果:       統計A        統計B        統計C

關鍵設計決策

  1. 時間基準:使用事件時間(event time),不是處理時間
  2. 窗口邊界[start, end) - 開始包含,結束不包含
  3. 觸發機制:窗口結束時觸發結果輸出
  4. 狀態管理:每個窗口維護獨立的聚合狀態

步驟一:時間窗口核心類別

這一步我們要打造時間窗口的基礎設施 - 就像建房子前要先打地基一樣。

1.1 TimeWindow 基礎類別設計

步驟 1.1.1:核心屬性定義

class TimeWindow:
    """時間窗口的基礎類別 - 定義一個時間區間"""
    
    def __init__(self, start_time: int, end_time: int):
        # 使用毫秒級時間戳,避免浮點數精度問題
        self.start_time = start_time  # 窗口開始時間(毫秒)
        self.end_time = end_time      # 窗口結束時間(毫秒)

步驟 1.1.2:時間包含判斷邏輯

    def contains(self, timestamp: int) -> bool:
        """判斷時間戳是否在窗口內"""
        # 使用半開區間 [start_time, end_time)
        # - start_time 包含:該時刻的事件屬於這個窗口
        # - end_time 不包含:該時刻的事件屬於下一個窗口
        return self.start_time <= timestamp < self.end_time
        
        # 例子:窗口 [14:00:00, 14:05:00)
        # - 14:00:00 包含 ✓
        # - 14:02:30 包含 ✓  
        # - 14:05:00 不包含 ✗ (屬於下個窗口)

步驟 1.1.3:窗口顯示和字串表示

    def __str__(self):
        """將窗口轉換為可讀的字串格式"""
        # 步驟1:毫秒時間戳轉換為 datetime 對象
        start = datetime.fromtimestamp(self.start_time / 1000)  # 除以1000:毫秒→秒
        end = datetime.fromtimestamp(self.end_time / 1000)
        
        # 步驟2:格式化為 HH:MM:SS 格式
        start_str = start.strftime('%H:%M:%S')  # "14:00:00"
        end_str = end.strftime('%H:%M:%S')      # "14:05:00"
        
        # 步驟3:組合成半開區間表示法 [start-end)
        return f"[{start_str}-{end_str})"

步驟 1.1.4:窗口相等性和哈希支持

    def __eq__(self, other):
        """判斷兩個窗口是否相等"""
        # 只有開始和結束時間都相同才認為是同一個窗口
        return (self.start_time == other.start_time and 
                self.end_time == other.end_time)
    
    def __hash__(self):
        """計算窗口的哈希值,支持作為字典鍵使用"""
        # 使用 (start_time, end_time) 元組的哈希
        return hash((self.start_time, self.end_time))
        
        # 用途:支持窗口對象作為字典鍵
        # window_states = {
        #     TimeWindow(start1, end1): aggregator1,
        #     TimeWindow(start2, end2): aggregator2
        # }

TimeWindow 使用範例:

# 創建一個 5分鐘窗口:14:00:00-14:05:00
window = TimeWindow(1635739800000, 1635740100000)

# 測試時間包含
print(window.contains(1635739980000))  # 14:03:00 -> True
print(window.contains(1635740100000))  # 14:05:00 -> False

# 窗口字串表示
print(str(window))  # "[14:00:00-14:05:00)"

# 作為字典鍵
window_data = {window: "some_aggregation_result"}

1.2 FixedWindowCalculator 窗口計算器

步驟 1.2.1:計算器初始化

class FixedWindowCalculator:
    """固定窗口計算器 - 負責時間對齊和窗口管理"""
    
    def __init__(self, window_size_ms: int):
        # 存儲窗口大小(毫秒)
        self.window_size_ms = window_size_ms
        
        # 例子:
        # - 5分鐘窗口:window_size_ms = 5 * 60 * 1000 = 300000
        # - 1小時窗口:window_size_ms = 60 * 60 * 1000 = 3600000

步驟 1.2.2:核心窗口對齊算法

    def get_window_for_timestamp(self, timestamp: int) -> TimeWindow:
        """根據時間戳計算對應的窗口 - 核心對齊算法"""
        
        # 例子數據:timestamp = 1635739980000 (14:03:00), window_size = 300000ms (5分鐘)
        
        # 步驟1:計算在當前窗口內的偏移量
        # 使用取模運算:1635739980000 % 300000 = 180000ms (3分鐘)
        # 意義:14:03:00 距離最近窗口邊界(14:00:00)的偏移是3分鐘
        offset = timestamp % self.window_size_ms
        
        # 步驟2:向後對齊到窗口開始時間  
        # 1635739980000 - 180000 = 1635739800000 (14:00:00)
        # 無論是 14:01:00, 14:03:30, 14:04:59 都會對齊到 14:00:00
        window_start = timestamp - offset
        
        # 步驟3:計算窗口結束時間
        # 1635739800000 + 300000 = 1635740100000 (14:05:00)
        # 形成窗口 [14:00:00 - 14:05:00)
        window_end = window_start + self.window_size_ms
        
        return TimeWindow(window_start, window_end)

步驟 1.2.3:窗口過期檢測機制

    def is_window_expired(self, window: TimeWindow, current_time: int, 
                         grace_period_ms: int = 0) -> bool:
        """判斷窗口是否已過期"""
        
        # 過期判斷邏輯:當前時間 >= (窗口結束時間 + 寬限期)
        expiry_time = window.end_time + grace_period_ms
        is_expired = current_time >= expiry_time
        
        return is_expired
        
        # 例子:
        # - 窗口:[14:00:00-14:05:00),結束時間 = 14:05:00
        # - 當前時間:14:06:30
        # - 無寬限期:14:06:30 >= 14:05:00,已過期 ✓
        # - 2分鐘寬限期:14:06:30 >= 14:07:00,未過期 ✗

1.3 時間工具函數

步驟 1.3.1:時間單位轉換工具

def hours(n: int) -> int:
    """小時轉毫秒"""
    # n小時 * 60分鐘/小時 * 60秒/分鐘 * 1000毫秒/秒
    return n * 60 * 60 * 1000
    
    # 例子:
    # hours(1) = 3600000ms (1小時)
    # hours(2) = 7200000ms (2小時)

def minutes(n: int) -> int:
    """分鐘轉毫秒"""
    # n分鐘 * 60秒/分鐘 * 1000毫秒/秒
    return n * 60 * 1000
    
    # 例子:
    # minutes(5) = 300000ms (5分鐘)
    # minutes(30) = 1800000ms (30分鐘)

def seconds(n: int) -> int:
    """秒轉毫秒"""
    # n秒 * 1000毫秒/秒
    return n * 1000
    
    # 例子:
    # seconds(30) = 30000ms (30秒)
    # seconds(120) = 120000ms (2分鐘)

使用範例:

# 創建不同大小的窗口
calculator_5min = FixedWindowCalculator(minutes(5))    # 5分鐘窗口
calculator_1hour = FixedWindowCalculator(hours(1))     # 1小時窗口
calculator_30sec = FixedWindowCalculator(seconds(30))  # 30秒窗口

# 比直接使用毫秒數更直觀:
# - 好:FixedWindowCalculator(minutes(5))
# - 差:FixedWindowCalculator(300000)

步驟二:窗口狀態管理

2.1 WindowState 狀態管理器

步驟 2.1.1:狀態管理核心設計

class WindowState:
    """窗口狀態管理器 - 管理所有窗口的聚合狀態"""
    
    def __init__(self, aggregator_factory):
        # 核心狀態存儲:(group_key, window) -> aggregator
        # 例:{("C001", Window[14:00-14:05)): CountAggregator(count=3)}
        self.window_aggregators = {}
        
        # 加速查詢:group_key -> Set[windows]  
        # 例:{"C001": {Window[14:00-14:05), Window[14:05-14:10)}}
        self.active_windows = defaultdict(set)
        
        # 追蹤最新時間戳:group_key -> latest_timestamp
        # 例:{"C001": 1635739980000}
        self.latest_timestamp = defaultdict(int)
        
        # 聚合器工廠函數
        self.aggregator_factory = aggregator_factory

步驟 2.1.2:獲取或創建聚合器

    def get_or_create_aggregator(self, group_key: str, window: TimeWindow):
        """獲取或創建指定群組和窗口的聚合器"""
        
        # 步驟1:組合複合鍵
        composite_key = (group_key, window)
        
        # 步驟2:檢查是否已存在
        if composite_key not in self.window_aggregators:
            # 步驟3:不存在則創建新聚合器
            new_aggregator = self.aggregator_factory()
            self.window_aggregators[composite_key] = new_aggregator
            
            # 步驟4:更新活躍窗口索引
            self.active_windows[group_key].add(window)
        
        # 步驟5:返回聚合器
        return self.window_aggregators[composite_key]

步驟 2.1.3:過期窗口檢測

    def get_expired_windows(self, group_key: str, current_timestamp: int, 
                           grace_period_ms: int) -> List[TimeWindow]:
        """找出已過期的窗口"""
        expired_windows = []
        
        # 遍歷該群組的所有活躍窗口
        for window in list(self.active_windows[group_key]):
            # 檢查是否過期:當前時間 >= (窗口結束 + 寬限期)
            expiry_time = window.end_time + grace_period_ms
            if current_timestamp >= expiry_time:
                expired_windows.append(window)
                
        return expired_windows
    
    def remove_expired_windows(self, group_key: str, expired_windows: List[TimeWindow]):
        """清理過期窗口的狀態"""
        for window in expired_windows:
            # 步驟1:從狀態存儲中移除
            composite_key = (group_key, window)
            if composite_key in self.window_aggregators:
                del self.window_aggregators[composite_key]
            
            # 步驟2:從活躍窗口索引中移除  
            self.active_windows[group_key].discard(window)

狀態管理流程範例:

# 初始化狀態管理器
window_state = WindowState(lambda: CountAggregator())

# 事件1:C001 在 14:03:00
window1 = TimeWindow(1635739800000, 1635740100000)  # [14:00-14:05)
agg1 = window_state.get_or_create_aggregator("C001", window1)
agg1.update(1)  # count = 1

# 狀態快照:
# window_aggregators = {("C001", Window[14:00-14:05)): CountAgg(1)}
# active_windows = {"C001": {Window[14:00-14:05)}}

# 事件2:C001 在 14:07:00(新窗口)
window2 = TimeWindow(1635740100000, 1635740400000)  # [14:05-14:10)
agg2 = window_state.get_or_create_aggregator("C001", window2)
agg2.update(1)  # count = 1

# 狀態快照:
# window_aggregators = {
#     ("C001", Window[14:00-14:05)): CountAgg(1),
#     ("C001", Window[14:05-14:10)): CountAgg(1)
# }
# active_windows = {"C001": {Window[14:00-14:05), Window[14:05-14:10)}}

步驟三:窗口化 GroupBy 操作

3.1 WindowedGroupByOperation 事件處理器

步驟 3.1.1:核心架構初始化

class WindowedGroupByOperation:
    """窗口化的 GroupBy 操作 - 整合所有窗口組件"""
    
    def __init__(self, group_key: Union[str, Callable], 
                 window_calculator: FixedWindowCalculator,
                 aggregator_factory: Callable[[], BaseAggregator],
                 grace_period_ms: int = 0):
        
        # 步驟1:存儲核心參數
        self.group_key = group_key                      # 分組欄位 ("customer_id")
        self.window_calculator = window_calculator       # 窗口計算器
        self.aggregator_factory = aggregator_factory    # 聚合器工廠
        self.grace_period_ms = grace_period_ms          # 寬限期(毫秒)
        
        # 步驟2:初始化狀態管理器
        self.window_state = WindowState(aggregator_factory)
        
        # 設計理念:
        # - window_calculator 負責時間對齊
        # - window_state 負責狀態管理  
        # - aggregator_factory 負責聚合邏輯
        # - grace_period_ms 提供延遲容忍度

步驟 3.1.2:事件處理核心流程

    def process_event(self, event: Dict[str, Any], timestamp_field: str = "timestamp") -> List[Dict[str, Any]]:
        """處理事件並返回觸發的窗口結果"""
        
        # 步驟1:提取和轉換時間戳
        timestamp = self._extract_timestamp(event, timestamp_field)
        
        # 步驟2:提取分組鍵值
        group_value = self._extract_group_key(event)
        
        # 步驟3:計算事件所屬窗口
        window = self.window_calculator.get_window_for_timestamp(timestamp)
        
        # 步驟4:更新聚合狀態
        aggregator = self.window_state.get_or_create_aggregator(group_value, window)
        aggregator.update(event.get("value", 1))
        
        # 步驟5:更新時間戳追蹤
        self.window_state.update_latest_timestamp(group_value, timestamp)
        
        # 步驟6:檢查並處理過期窗口
        return self._handle_expired_windows(group_value, timestamp)

步驟 3.1.3:過期窗口處理

    def _handle_expired_windows(self, group_key: str, current_timestamp: int) -> List[Dict[str, Any]]:
        """處理過期窗口並返回結果"""
        
        # 步驟1:找出已過期的窗口
        expired_windows = self.window_state.get_expired_windows(
            group_key, current_timestamp, self.grace_period_ms
        )
        
        # 步驟2:為每個過期窗口生成結果
        results = []
        for window in expired_windows:
            # 獲取該窗口的最終聚合結果
            final_result = self.window_state.get_window_result(group_key, window)
            
            if final_result is not None:
                # 組裝窗口結果
                window_result = {
                    "group_key": group_key,                    # 分組鍵
                    "value": final_result,                     # 聚合結果
                    "window_start": window.start_time,         # 窗口開始時間(毫秒)
                    "window_end": window.end_time,             # 窗口結束時間(毫秒)
                    "window_start_str": self._format_time(window.start_time),  # 可讀開始時間
                    "window_end_str": self._format_time(window.end_time)       # 可讀結束時間
                }
                results.append(window_result)
        
        # 步驟3:清理過期窗口狀態
        if expired_windows:
            self.window_state.remove_expired_windows(group_key, expired_windows)
        
        return results

處理流程完整範例:

# 事件處理追蹤:{"customer_id": "C001", "amount": 100, "timestamp": "14:03:00"}

# 步驟1:時間戳提取 -> 1635739980000 (14:03:00的毫秒時間戳)
# 步驟2:分組鍵提取 -> "C001" 
# 步驟3:窗口計算 -> TimeWindow[14:00:00-14:05:00) (5分鐘窗口)
# 步驟4:聚合更新 -> CountAggregator.update(1) -> count變為2
# 步驟5:時間追蹤 -> 更新C001的最新時間為1635739980000
# 步驟6:過期檢查 -> 檢查是否有過期窗口,如有則輸出結果並清理狀態

步驟四:DataFrame 窗口整合

4.1 DataFrame 窗口擴展設計

步驟 4.1.1:SimpleDataFrame 窗口方法

class SimpleDataFrame:
    """原有 DataFrame 類別擴展窗口功能"""
    
    def window(self, duration_ms: int, grace_period_ms: int = 0) -> 'WindowedDataFrame':
        """創建固定時間窗口 - 裝飾者模式入口"""
        # 步驟1:創建窗口裝飾器
        # 將原有 DataFrame 包裝成窗口化版本
        return WindowedDataFrame(self, duration_ms, grace_period_ms)

步驟 4.1.2:WindowedDataFrame

class WindowedDataFrame:
    """窗口化的 DataFrame - 裝飾原始 DataFrame"""
    
    def __init__(self, source_df: 'SimpleDataFrame', duration_ms: int, grace_period_ms: int = 0):
        # 步驟1:保存原對象
        self.source_df = source_df
        
        # 步驟2:存儲窗口配置
        self.duration_ms = duration_ms          # 窗口大小(毫秒)
        self.grace_period_ms = grace_period_ms  # 寬限期(毫秒)
        
        # 步驟3:創建窗口計算器
        self.window_calculator = FixedWindowCalculator(duration_ms)
        
        # 設計優勢:
        # - source_df:保持對原對象的引用,可以調用原有方法
        # - window_calculator:專門負責窗口時間計算
        # - 職責分離:窗口邏輯與原始邏輯解耦
    
    def group_by(self, key: Union[str, Callable]) -> 'WindowedGroupedDataFrame':
        """窗口化分組 - 返回窗口分組對象"""
        return WindowedGroupedDataFrame(self, key)

步驟 4.1.3:WindowedGroupedDataFrame 聚合介面

class WindowedGroupedDataFrame:
    """窗口化分組的 DataFrame - 提供聚合方法"""
    
    def __init__(self, windowed_df: WindowedDataFrame, group_key: Union[str, Callable]):
        # 步驟1:保存窗口化 DataFrame 引用
        self.windowed_df = windowed_df
        self.group_key = group_key
        
        # 步驟2:初始化配置狀態
        self._aggregation_configured = False  # 防止重複配置
    
    def count(self) -> 'SimpleDataFrame':
        """窗口化計數聚合"""
        return self._setup_windowed_aggregation(CountAggregator, "count")

4.2 窗口聚合設置流程

步驟 4.2.1:統一聚合設置方法

    def _setup_windowed_aggregation(self, aggregator_class, aggregation_name: str, 
                                   value_field: Optional[str] = None) -> 'SimpleDataFrame':
        """設置窗口化聚合 - 統一處理流程"""
        
        # 步驟1:防止重複配置
        if self._aggregation_configured:
            raise RuntimeError("Aggregation already configured")
        
        # 步驟2:創建窗口化 GroupBy 操作
        self.windowed_groupby_op = WindowedGroupByOperation(
            group_key=self.group_key,                               # 分組鍵
            window_calculator=self.windowed_df.window_calculator,   # 窗口計算器
            aggregator_factory=aggregator_class,                    # 聚合器類型
            grace_period_ms=self.windowed_df.grace_period_ms        # 寬限期
        )
        
        # 步驟3:存儲聚合配置
        self.value_field = value_field
        
        # 步驟4:創建結果 DataFrame
        result_df = SimpleDataFrame(
            f"{self.windowed_df.source_df.name}_windowed_{aggregation_name}"
        )
        
        # 步驟5:設置事件處理邏輯
        self._setup_windowed_processing(result_df)
        
        # 步驟6:標記為已配置並返回
        self._aggregation_configured = True
        return result_df

步驟 4.2.2:事件處理攔截機制

    def _setup_windowed_processing(self, result_df: 'SimpleDataFrame'):
        """設置窗口化處理邏輯 - 函數攔截模式"""
        
        # 步驟1:保存原始處理函數
        original_process = self.windowed_df.source_df.process_message
        
        # 步驟2:定義增強處理函數
        def enhanced_process(message: Dict[str, Any]) -> bool:
            try:
                # 子步驟2.1:先執行原本的處理流程
                original_result = original_process(message)
                
                # 子步驟2.2:只有原處理成功才執行窗口化聚合
                if original_result:
                    # 預處理:準備聚合數據
                    if self.value_field:
                        message = message.copy()
                        message["value"] = message.get(self.value_field, 0)
                    
                    # 執行窗口化聚合
                    window_results = self.windowed_groupby_op.process_event(message)
                    
                    # 將每個窗口結果發送到結果 DataFrame
                    for window_result in window_results:
                        result_df.process_message(window_result)
                
                return original_result
                
            except Exception as e:
                # 子步驟2.3:異常處理 - 退回原始處理
                logger.error(f"Error in windowed aggregation processing: {e}")
                return original_process(message)
        
        # 步驟3:替換處理函數 - 攔截機制的核心
        self.windowed_df.source_df.process_message = enhanced_process

4.3 API 使用流程展示

步驟 4.3.1:基本窗口聚合用法

# API 鏈式調用展示
windowed_counts = (orders_df                    # SimpleDataFrame
    .window(minutes(5))                         # -> WindowedDataFrame
    .group_by("customer_id")                    # -> WindowedGroupedDataFrame  
    .count()                                    # -> SimpleDataFrame (結果)
)

總結

今天我們完成了流式聚合系統最重要的進化:從全局累積統計轉向時間分段統計。這個看似簡單的改變,實際上是將分組鍵從一維擴展到二維的架構升級,讓我們的聚合系統具備了時間感知能力。

核心的技術突破在於窗口對齊算法。通過一個優雅的數學公式 timestamp % window_size,我們解決了事件時間到窗口邊界的映射問題,無論事件何時到達,都能精確計算出它所屬的時間窗口。這種設計既保證了計算效率,又確保了窗口邊界的一致性。

在狀態管理方面,我們建立了以 (group_key, window) 為複合主鍵的存儲結構,每個窗口維護獨立的聚合狀態。更重要的是,我們實現了完整的窗口生命週期管理機制:當窗口過期時,系統會自動輸出最終結果並清理相關狀態,避免記憶體洩漏的同時保證了結果的及時性。

API 設計保持了優雅的一致性,只需在原有的聚合調用前加上 .window(duration) 就能將全局聚合轉換為窗口化聚合。這種設計既維持了向後兼容性,又提供了強大的時間分析能力,讓開發者能夠輕鬆實現按時段的業務監控、異常檢測和趨勢分析。

窗口化聚合為實時數據分析開啟了新的可能性。我們不再只能看到「從開始到現在」的累積數據,而是能夠觀察「每個時段」的具體表現,這對於理解數據的時間模式和及時響應業務變化具有重要意義。

其他 SQL 風格算子的可能性

有了窗口化聚合的基礎,我們可以類似地實現更多 SQL 風格的算子:

  • TopN 排行榜:使用 min-heap 維護前 N 大元素,實現動態排行榜
  • DISTINCT 去重:使用 Set 或 BloomFilter 追蹤已見過的值
  • PERCENTILE 百分位數:使用分位數估算算法處理大數據流
  • FIRST_VALUE/LAST_VALUE:維護窗口內的首尾值
  • LAG/LEAD:訪問前後行的數據,實現時序比較

這些算子的實現模式都很相似:設計專門的聚合器,然後利用我們已有的窗口化框架。重點在於選擇合適的數據結構和算法來高效維護狀態。

Day 17 預告:狀態管理的挑戰

隨著聚合功能越來越豐富,我們面臨一個關鍵問題:如何保證狀態的可靠性?當程序崩潰重啟時,所有的聚合狀態都會丟失。下一章我們將探討狀態的持久化保存與故障恢復機制,讓流式處理系統從「功能完整」邁向「生產就緒」。


上一篇
【知其然,更知其所以然】Day15:GroupBy 的進化 - 多種聚合器
下一篇
【知其然,更知其所以然】Day17:狀態的生死之謎 - 持久化
系列文
「知其然,更知其所以然:什麼是 Real-time (Streaming) Pipeline?從造輪子到 Flink 與 RisingWave」18
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言