iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0
Build on AWS

AWS架構師的自我修養:30天雲端系統思維實戰指南系列 第 17

Day 11-3 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(三) - 核心設計策略AWS實戰解析:CQRS命令查詢責任分離 - 以世界盃足球賽為例

  • 分享至 

  • xImage
  •  

3. CQRS(命令查詢責任分離)

讓我們放過圖書館可憐的管理員與累癱的工讀生,現在我們將我們的興致延燒到全世界將近三分之一的人同時觀看、萬萬眾矚目且緊張刺激的世界盃決賽足球場。

球場上的事件瞬息萬變,一毫秒之差就有可能跪倒在冠軍台階前又或是誕生出了新的足球先生。然而,足球場就這麼大,假如全世界的運動員、媒體與愛好者都集中在同一個球場中,那勢必在物理法則上是不可能的 - 我們是人,不是樂高積木。所以絕大多數我們還是透過 媒體(紀錄行為)報導(紀錄的影響)

當一名球員犯規,裁判吹哨並掏出黃牌。他會拿出筆記本,嚴謹地記錄下:球員背號時間犯規類型。與此同時,場邊有各種媒體,廣播直播員立即在現場位席中轉述誰領到了黃牌,線上快報記者也立即發布了走馬燈消息呈現在電視台既有節目中,現場 Live 記者祖業立即對焦裁判與焦急反駁的犯規球員等待後續事件的發生。

看到了嗎?事實只有一個(裁判記錄了一張黃牌),但資料的呈現方式卻有千百種。如果強迫所有媒體都去讀裁判那本潦草、只有關鍵字的筆記本,那將是一場災難。

球場上正在發生的事,是唯一的「事實」。

命令端 (Command Side) - 球場上的裁判
裁判吹哨並掏出黃牌。他會拿出筆記本,嚴謹地記錄... 這個動作就是一個「命令」。它改變了比賽的官方狀態。裁判的筆記本就是「寫入模型」(Write Model),它的設計目標是絕對的準確性與規則一致性。它不關心報導是否精彩,只關心記錄是否無誤。

查詢端 (Query Side) - 轉播單位與媒體

與此同時,場邊有各種媒體,他們的需求(查詢)截然不同:

  • 電視/廣播直播員:他們需要即時的數據流來進行播報。他們的「查詢模型」是一個包含目前比分比賽時間控球方即時事件(如:射門、角球)的儀表板。它為速度和敘事而優化。
  • 快報記者:他們需要立即發布最關鍵的資訊。他們的「查詢模型」可能只是一個極簡的事件列表:「梅西進球!阿根廷 1:0 領先!」。它為時效性而優化。
  • 晚報分析師:他們需要賽後的完整統計數據來撰寫深度報導。他們的「查詢模型」是一個包含球員跑動距離傳球成功率射門分佈圖的複雜聚合報表。它為深度與分析而優化。
  • 社群媒體小編:他們需要精彩的片段來吸引眼球。他們的「查詢模型」是進球的 GIF 動圖、爭議判罰的短影片。它為互動性與傳播性而優化。

CQRS 的核心哲學就是:讓裁判專心做好記錄(命令),同時允許各家媒體根據自己的需求,建立最適合自己的觀賽筆記(查詢)。

抽象概念:寫入優化的結構 ≠ 查詢優化的結構

設計原則

  • 命令端(Command)

    • 高度正規化
    • 強一致性要求
    • 寫入效能優化
  • 查詢端(Query)

    • 大量反正規化
    • 最終一致性
    • 讀取效能優化

實際效益

  • 讀寫負載分離
  • 各自獨立擴展
  • 複雜查詢不影響核心業務

應用場景

CQRS 不是萬靈丹。它會引入額外的複雜性,因此只應在收益明顯大於成本的場景中使用。以下是 CQRS 發揮最大價值的幾個經典場景:

  1. 高流量讀取密集型系統(讀取需求是寫入的平方根以上)

    • 情境:電商平台的商品目錄、新聞媒體的報導頁面、社交平台的內容流。在這些系統中,讀取操作(瀏覽商品、閱讀文章)的數量遠遠超過寫入操作(下訂單、發布文章)。
    • 為何選擇 CQRS:可以獨立擴展查詢端。例如,我們可以部署一個由 10 個節點組成的讀取資料庫叢集和一個高性能快取層,來應對數百萬用戶的瀏覽請求,而寫入端可能只需要一個單一、穩定的資料庫實例。這避免了讀取流量衝垮核心交易系統的風險。
  2. 複雜的查詢與多樣化的資料視圖

    • 情境:商業智慧(BI)儀表板、數據分析平台、管理後台。這些系統需要從同一個核心數據中,產生多種完全不同的視圖(View)。例如,一個銷售數據,需要同時被呈現為「按地區的銷售額趨勢圖」、「按產品類別的利潤分析報表」和「高價值客戶的活躍度列表」。
    • 為何選擇 CQRS:可以為每種查詢需求建立一個專門優化的「讀取模型」(Read Model)。趨勢圖的模型可能是一個預先計算好的時間序列數據表;利潤分析報表可能是一個反正規化的寬表;而客戶列表則可能是一個儲存在搜尋引擎(如 OpenSearch)中的索引。這避免了在單一、正規化的資料庫上執行大量複雜、緩慢的 JOIN 和聚合操作。
  3. 與事件溯源(Event Sourcing)的結合

    • 情境:金融交易系統、審計日誌系統、任何需要完整歷史追溯能力的領域。在這些場景中,系統的核心是不可變的事件日誌。
    • 為何選擇 CQRS:CQRS 是實現事件溯源的天然搭檔。
      • 命令端:負責驗證命令並將其轉化為事件,然後將事件儲存到事件日誌中(如 DynamoDB 或 Kinesis)。這是唯一的寫入操作。
      • 查詢端:透過訂閱事件流,建立和更新各種「投影」(Projections),也就是讀取模型。系統的「當前狀態」本身就是一個由所有歷史事件投影而來的讀取模型。
  4. 高併發協作領域

    • 情境:線上協作文件(如 Google Docs)、多人共享的甘特圖、共同編輯的設計工具。多個用戶同時對同一個資源進行操作。
    • 為何選擇 CQRS:可以將衝突的處理範圍縮小。所有的「寫入」操作(Commands)可以被序列化處理(例如通過一個 FIFO 佇列),確保狀態變更的一致性和順序性。同時,每個用戶的「讀取」操作可以從為他們自己優化的讀取模型中快速獲取數據,而不會被其他人的寫入操作阻塞。

挑戰與複雜性

  1. 最終一致性:這是 CQRS 最大的挑戰。用戶更新資料後,立即查詢可能看到的還是舊資料,這需要前端 UI/UX 設計來妥善處理(例如,顯示「更新中...」或使用樂觀更新)。
  2. 雙倍的模型:開發者需要維護命令模型和一個或多個查詢模型,增加了程式碼的複雜度和開發工作量。
  3. 基礎設施複雜度:需要引入事件匯流排(如 EventBridge)、訊息佇列(如 SQS)等中介軟體來同步兩端資料,增加了系統的維運成本和潛在故障點。
  4. 資料同步與回放:如果查詢端的讀取模型損壞或需要變更結構,可能需要一個機制來「重播」(Replay)歷史事件以重建讀取模型,這是一個複雜的過程。

AWS 實現:足球賽事事件的 CQRS

graph TD
    subgraph "事件發生 (事實來源)"
        A[⚽ 球場上的事件<br/>裁判吹哨 + 黃牌]
    end

    subgraph "命令端 (Command Side) - 寫入模型"
        A --> B[📝 裁判記錄命令<br/>RecordYellowCardCommand]
        B --> C[🔍 業務邏輯驗證<br/>- 球員是否在場<br/>- 比賽是否進行中]
        C --> D[💾 寫入正規化資料<br/>Amazon RDS PostgreSQL]

        subgraph "正規化表結構"
            D1[📋 Bookings 表<br/>booking_id, player_id, referee_id]
            D2[👤 Players 表<br/>player_id, player_name, team_id]
            D3[🏆 Teams 表<br/>team_id, team_name, country]
            D4[⚠️ FoulTypes 表<br/>foul_type_id, description]
            D5[👨‍⚖️ Referees 表<br/>referee_id, referee_name]
        end

        D --> D1
        D --> D2
        D --> D3
        D --> D4
        D --> D5

        D --> E[📡 發布領域事件<br/>PlayerBookedEvent]
        E --> F[🌐 Amazon EventBridge<br/>事件匯流排]
    end

    subgraph "事件分發與路由"
        F --> G{🎯 事件路由規則}
        G -->|電視直播事件| H1[📺 SQS: tv-broadcast-queue]
        G -->|快報新聞事件| H2[📰 SQS: news-alert-queue]
        G -->|社群媒體事件| H3[📱 SQS: social-media-queue]
        G -->|賽後分析事件| H4[📊 SQS: analytics-queue]
    end

    subgraph "查詢端 (Query Side) - 讀取模型更新"
        H1 --> I1[⚡ Lambda: UpdateTVDashboard]
        H2 --> I2[⚡ Lambda: UpdateNewsAlert]
        H3 --> I3[⚡ Lambda: UpdateSocialFeed]
        H4 --> I4[⚡ Lambda: UpdateAnalytics]

        subgraph "反正規化讀取模型"
            J1[📺 TVLiveEvents 表<br/>DynamoDB<br/>event_time, player_name, team_name, description]
            J2[📰 NewsAlerts 表<br/>DynamoDB<br/>headline, urgency_level, formatted_text]
            J3[📱 SocialMediaFeed 表<br/>DynamoDB<br/>post_content, hashtags, media_urls]
            J4[📊 MatchAnalytics 表<br/>DynamoDB<br/>player_stats, team_performance, aggregated_data]
        end

        I1 --> J1
        I2 --> J2
        I3 --> J3
        I4 --> J4
    end

    subgraph "各媒體查詢 (極速讀取)"
        K1[📺 電視台直播員<br/>查詢即時事件] --> L1[🚀 直接讀取 TVLiveEvents<br/>毫秒級響應]
        K2[📰 快報記者<br/>查詢重要新聞] --> L2[🚀 直接讀取 NewsAlerts<br/>毫秒級響應]
        K3[📱 社群小編<br/>查詢熱門內容] --> L3[🚀 直接讀取 SocialMediaFeed<br/>毫秒級響應]
        K4[📊 分析師<br/>查詢統計數據] --> L4[🚀 直接讀取 MatchAnalytics<br/>毫秒級響應]

        J1 --> L1
        J2 --> L2
        J3 --> L3
        J4 --> L4
    end

    subgraph "快取加速層"
        M[⚡ ElastiCache Redis<br/>熱門查詢快取]
        L1 --> M
        L2 --> M
        L3 --> M
        L4 --> M
    end

    subgraph "監控與可觀測性"
        N1[📊 CloudWatch<br/>事件處理指標]
        N2[🔍 X-Ray<br/>分散式追蹤]
        N3[📝 CloudWatch Logs<br/>事件處理日誌]

        F --> N1
        I1 --> N1
        I2 --> N1
        I3 --> N1
        I4 --> N1

        B --> N2
        I1 --> N2

        E --> N3
        F --> N3
    end

    %% 樣式定義
    classDef command fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#000
    classDef query fill:#e8f5e8,stroke:#388e3c,stroke-width:2px,color:#000
    classDef event fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px,color:#000
    classDef storage fill:#fff3e0,stroke:#f57c00,stroke-width:2px,color:#000
    classDef media fill:#fce4ec,stroke:#c2185b,stroke-width:2px,color:#000

    class B,C,D,D1,D2,D3,D4,D5 command
    class I1,I2,I3,I4,J1,J2,J3,J4,L1,L2,L3,L4 query
    class E,F,G,H1,H2,H3,H4 event
    class M,N1,N2,N3 storage
    class K1,K2,K3,K4 media
class FootballMatchCQRS:
    """足球賽事的 CQRS 實現"""

    def __init__(self):
        # 命令端:正規化的關聯式資料庫
        self.command_db = boto3.client('rds-data', database='football_command')

        # 查詢端:反正規化的 NoSQL 資料庫
        self.query_db = boto3.client('dynamodb')

        # 事件匯流排
        self.event_bridge = boto3.client('events')

        # 快取層
        self.cache = boto3.client('elasticache')

    async def record_yellow_card_command(self, match_id, player_id, referee_id, foul_type_id, minute):
        """命令端:記錄黃牌事件(正規化存儲)"""

        async with self.command_db.begin_transaction() as tx:
            # 1. 業務邏輯驗證
            await self.validate_yellow_card_business_rules(match_id, player_id, minute)

            # 2. 寫入正規化的關聯表
            booking_id = await tx.execute("""
                INSERT INTO bookings (match_id, player_id, referee_id, foul_type_id, game_minute, timestamp)
                VALUES (:match_id, :player_id, :referee_id, :foul_type_id, :minute, NOW())
                RETURNING booking_id
            """, {
                'match_id': match_id,
                'player_id': player_id,
                'referee_id': referee_id,
                'foul_type_id': foul_type_id,
                'minute': minute
            })

            # 3. 更新球員累計統計(在同一事務中)
            await tx.execute("""
                UPDATE player_match_stats
                SET yellow_cards = yellow_cards + 1
                WHERE match_id = :match_id AND player_id = :player_id
            """, {'match_id': match_id, 'player_id': player_id})

            # 4. 發布領域事件
            await self.publish_player_booked_event(booking_id, match_id, player_id, foul_type_id, minute)

    async def publish_player_booked_event(self, booking_id, match_id, player_id, foul_type_id, minute):
        """發布球員被罰黃牌事件"""

        # 從正規化表中組合完整的事件資料
        event_data = await self.build_complete_event_data(match_id, player_id, foul_type_id)

        await self.event_bridge.put_events(
            Entries=[
                {
                    'Source': 'com.football.match',
                    'DetailType': 'PlayerBooked',
                    'Detail': json.dumps({
                        'booking_id': booking_id,
                        'match_id': match_id,
                        'player_name': event_data['player_name'],
                        'team_name': event_data['team_name'],
                        'foul_description': event_data['foul_description'],
                        'game_minute': minute,
                        'timestamp': datetime.utcnow().isoformat()
                    }),
                    'EventBusName': 'football-events'
                }
            ]
        )

    async def update_tv_dashboard_projection(self, event):
        """更新電視直播儀表板(查詢端投影)"""

        # 建立為電視直播優化的反正規化記錄
        tv_event = {
            'event_id': {'S': str(uuid.uuid4())},
            'match_id': {'S': event['match_id']},
            'display_text': {'S': f"{event['player_name']} ({event['team_name']}) - {event['foul_description']}"},
            'game_minute': {'N': str(event['game_minute'])},
            'urgency_level': {'S': 'MEDIUM'},
            'created_at': {'N': str(int(time.time()))},
            'ttl': {'N': str(int(time.time()) + 86400)}  # 24小時後自動清理
        }

        # 寫入專為電視台設計的查詢表
        await self.query_db.put_item(
            TableName='TVLiveEvents',
            Item=tv_event
        )

        # 同時更新快取,供電視台毫秒級查詢
        await self.cache.set(
            f"tv:live_events:{event['match_id']}",
            json.dumps(tv_event),
            ex=3600  # 1小時快取
        )

    async def update_news_alert_projection(self, event):
        """更新新聞快報投影"""

        # 建立新聞標題優化的資料結構
        news_alert = {
            'alert_id': {'S': str(uuid.uuid4())},
            'headline': {'S': f"⚠️ {event['player_name']} 獲得黃牌!"},
            'sub_headline': {'S': f"{event['team_name']} vs 對手 - 第{event['game_minute']}分鐘"},
            'urgency_score': {'N': '75'},
            'category': {'S': 'BOOKING'},
            'formatted_content': {'S': self.format_news_content(event)},
            'created_at': {'N': str(int(time.time()))}
        }

        await self.query_db.put_item(
            TableName='NewsAlerts',
            Item=news_alert
        )

    async def update_social_media_projection(self, event):
        """更新社群媒體投影"""

        social_post = {
            'post_id': {'S': str(uuid.uuid4())},
            'content': {'S': f"🟨 {event['player_name']} 第{event['game_minute']}分鐘吃牌!"},
            'hashtags': {'SS': ['#世界盃', f"#{event['team_name']}", '#黃牌']},
            'media_type': {'S': 'TEXT_WITH_EMOJI'},
            'engagement_score': {'N': '0'},
            'created_at': {'N': str(int(time.time()))}
        }

        await self.query_db.put_item(
            TableName='SocialMediaFeed',
            Item=social_post
        )

    async def update_analytics_projection(self, event):
        """更新分析師統計投影"""

        # 更新球員統計
        await self.query_db.update_item(
            TableName='MatchAnalytics',
            Key={
                'analysis_type': {'S': 'PLAYER_STATS'},
                'entity_id': {'S': f"player_{event['player_id']}"}
            },
            UpdateExpression='ADD yellow_cards :inc SET last_updated = :timestamp',
            ExpressionAttributeValues={
                ':inc': {'N': '1'},
                ':timestamp': {'N': str(int(time.time()))}
            }
        )

        # 更新團隊統計
        await self.query_db.update_item(
            TableName='MatchAnalytics',
            Key={
                'analysis_type': {'S': 'TEAM_STATS'},
                'entity_id': {'S': f"team_{event['team_id']}"}
            },
            UpdateExpression='ADD total_bookings :inc SET last_updated = :timestamp',
            ExpressionAttributeValues={
                ':inc': {'N': '1'},
                ':timestamp': {'N': str(int(time.time()))}
            }
        )

    # 各媒體的極速查詢方法
    async def get_tv_live_events(self, match_id):
        """電視台查詢:毫秒級響應"""

        # 先查快取
        cached = await self.cache.get(f"tv:live_events:{match_id}")
        if cached:
            return json.loads(cached)

        # 快取未命中,查詢 DynamoDB
        response = await self.query_db.query(
            TableName='TVLiveEvents',
            KeyConditionExpression='match_id = :match_id',
            ExpressionAttributeValues={':match_id': {'S': match_id}},
            ScanIndexForward=False,  # 最新事件在前
            Limit=10
        )

        events = [self.deserialize_tv_event(item) for item in response['Items']]

        # 回填快取
        await self.cache.set(f"tv:live_events:{match_id}", json.dumps(events), ex=300)

        return events

    async def get_breaking_news(self, urgency_threshold=70):
        """快報記者查詢:按急迫性排序"""

        response = await self.query_db.scan(
            TableName='NewsAlerts',
            FilterExpression='urgency_score >= :threshold',
            ExpressionAttributeValues={':threshold': {'N': str(urgency_threshold)}},
            Limit=5
        )

        return [self.deserialize_news_alert(item) for item in response['Items']]

    async def get_trending_social_content(self):
        """社群小編查詢:按互動度排序"""

        response = await self.query_db.scan(
            TableName='SocialMediaFeed',
            IndexName='EngagementScoreIndex',
            ScanIndexForward=False,  # 高互動度在前
            Limit=20
        )

        return [self.deserialize_social_post(item) for item in response['Items']]

    async def get_match_analytics(self, match_id):
        """分析師查詢:完整統計數據"""

        response = await self.query_db.query(
            TableName='MatchAnalytics',
            KeyConditionExpression='match_id = :match_id',
            ExpressionAttributeValues={':match_id': {'S': match_id}}
        )

        return [self.deserialize_analytics(item) for item in response['Items']]

上一篇
Day 11-2 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(二) - 核心設計策略AWS實戰解析:事件驅動架構
下一篇
Day 11-4 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(四) - 核心設計策略AWS實戰解析:多租戶架構 - 以Netflix S3架構為例
系列文
AWS架構師的自我修養:30天雲端系統思維實戰指南28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言