iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0
Build on AWS

AWS架構師的自我修養:30天雲端系統思維實戰指南系列 第 16

Day 11-2 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(二) - 核心設計策略AWS實戰解析:事件驅動架構

  • 分享至 

  • xImage
  •  

2. 事件驅動架構(Event-Driven Design)

再次回到我們的圖書館比喻。

傳統的資料庫設計,就像一個只關心「現在」的圖書館員。

我們問他:「《夜間飛行》在哪裡?」他會直接告訴我們:「在 A-7 書架上。」但如果我們追問:「那昨天下午三點,這本書在哪裡?」

他會一臉茫然,因為他只記錄書的最終狀態,過程中的變化都遺失了。

//TODO:《夜間飛行》是一本我非常喜歡的書,非常推薦大家看看:))

概念核心

事件驅動的哲學:記錄「發生了什麼」,而非「變成了什麼」,事件驅動架構,尤其是事件溯源,採用了一種截然不同的哲學。它像一位一絲不苟的歷史學家,他維護的不是一份書單,而是一本永不修改的流水帳 (Immutable Ledger)。

程式語言有一個概念來管理這樣子的抽象具體 Link (Node-List)。所有的事件都必然有其連接的前後事件,透過重播歷程,來重建任何一個時間點的狀態

所以,回到我們的圖書館吧檯,當我們檢視書籍的歷程記錄事件列時,我們會看到:

  • 09:15:會員 Alice 在 圖書館 C 歸還了《夜間飛行》。
  • 11:30:管理員將《夜間飛行》從 圖書館 C 移至 圖書館 B。
  • 15:10:工讀生將《夜間飛行》從還書車上架到 A-7。

這樣子的事件流。

看到了嗎?我們不儲存書的「位置」(狀態),我們只儲存書的每一次「移動」(事件)。

傳統 CRUD 是「破壞性」的,UPDATE users SET status = 'inactive'時,我們就永遠失去了這個用戶之前的狀態 - 資訊在更新中被破壞了。

事件溯源則是「建構性」記錄下 UserDeactivated 事件。所有的歷史資訊節點都被保留下來

快速摘要

  • 不儲存狀態,只儲存事件
  • 現在的狀態 = 所有歷史事件的重播結果

應用場景

  1. 微服務整合 (Microservices Integration):
  • 這是 EDA 最核心的應用。
  • 在電商平台中,當「訂單服務」產生一筆 OrderCreated 事件後,它可以完全不知道下游有哪些服務需要這個資訊。 「庫存服務」 可以訂閱此事件來扣減庫存,「物流服務」 可以訂閱它來準備出貨,「通知服務」 可以訂閱它來發送確認郵件。服務之間完全解耦,可以獨立開發、部署和擴展。
  1. 異步工作流程 (Asynchronous Workflows):
  • EDA 擅長於處理 耗時的連續性背景任務。例如 : 用戶上傳一段影片後,後端立即發布一個 VideoReceived 事件並馬上回應「上傳成功,轉檔中」。一個或多個專門的「轉檔服務」會接收此事件,在背景進行耗時的影片壓縮、加水印等工作。用戶無需在原地等待,體驗極佳。
  1. 即時數據處理與物聯網 (Real-time Data Processing & IoT):
  • 情境:處理源源不絕的數據流。來自成千上萬個 IoT 設備的溫度、濕度讀數,或是金融市場的即時股價波動,每一條數據都是一個事件。EDA 架構可以高效地對這些事件流進行過濾、轉換、聚合,並觸發即時警報或更新儀表板。
  1. 審計與合規 (Audit and Compliance):
  • 情境:當「發生了什麼」比「現在是什麼」更重要時。在金融、醫療或法律領域,擁有每一筆交易、每一次病歷修改的不可變紀錄,是合規性的基本要求。系統的任何狀態都可以從這些歷史事件中重現出來。

優勢

  • 完整的操作軌跡
  • 支援時光回溯
  • 天然的審計功能

然而,這種「把任務寫在白板上就走人」的溝通方式,也帶來了它獨有的複雜性與挑戰。

  1. 心智模型的複雜性:系統的控制流程變得隱晦。一個事件發布後,我們很難直觀地追蹤「接下來會發生什麼?」,這讓除錯變得極具挑戰性。
  2. 最終一致性:由於服務是異步處理的,系統在不同部分會存在短暫的狀態不一致。例如,訂單已成立,但庫存可能還沒被扣減。這對開發和用戶體驗設計都提出了更高的要求。
  3. 事件結構的演進與版本控制:如果「任務白板」上的訊息格式改變了(例如,訂單事件增加了「折扣碼」欄位),所有下游服務都必須能兼容新舊兩種格式,否則就會崩潰。這通常需要引入額外的工具來管理。
  4. 訊息中介層的可靠性:那個「任務白板」(如 Kafka、EventBridge)成為了系統的心臟。如果它故障,整個系統就會癱瘓。因此,它本身必須被設計成高可用、高可靠的,這是一項複雜且昂貴的工程。
  5. 測試的困難度:端到端的整合測試變得非常困難,因為我們需要驗證一個事件觸發後,所有相關的下游服務是否都正確地完成了它們各自的工作。

總結來說,EDA 提供了極致的靈活性和擴展性,但代價是增加了系統的整體複雜度和對「最終一致性」的容忍。選擇它,意味著我們必須準備好用管理一個複雜的「異步協作系統」來換取單個服務的「簡單和獨立」。

AWS 實現:投資交易系統的事件溯源

graph TD
    subgraph "寫入流程 store_trade_event"
        A[應用程式/服務] --> B{呼叫 store_trade_event}
        B --> C[1. 建立交易事件紀錄]
        C --> D["2. 寫入 DynamoDB<br/>(PortfolioEvents 表)"]
        D --> E["3. 發布事件到 Kinesis<br/>(portfolio-events 流)"]
        E --> F["下游消費者<br/>(如 Lambda, 其他微服務)"]
        F --> G["處理事件<br/>(如更新儀表板, 風險計算)"]
        D -- 強一致性寫入 --> C
    end

    subgraph "讀取/重建流程 rebuild_portfolio_state"
        H[應用程式/服務] --> I{呼叫 rebuild_portfolio_state}
        I --> J["1. 查詢 DynamoDB<br/>(PortfolioEvents 表)"]
        J --> K["2. 獲取特定 portfolio_id 的<br/>所有歷史事件"]
        K --> L["3. 在記憶體中<br/>按順序重播事件"]
        L --> M["4. 得到重建後的<br/>投資組合狀態"]
        M --> N[返回狀態給應用程式]
    end

寫入流程 (左側):

當一個交易行為發生時,應用程式會呼叫 store_trade_event
該函數首先將這個交易事件寫入 DynamoDB 的 PortfolioEvents 表中。這是我們的「事件日誌」,是不可變的、永久的紀錄。
寫入成功後,同一個事件會被發布到 Kinesis 數據流中。
其他關心此事件的下游服務(例如:更新報表的服務、計算風險的服務)會從 Kinesis 訂閱並接收到這個事件,然後執行各自的業務邏輯。這個過程是異步且解耦的。

讀取/重建流程 (右側):

當需要查詢某個投資組合的「當前狀態」時,應用程式會呼叫 rebuild_portfolio_state
系統會去查詢 DynamoDB,拉取該投資組合的所有歷史事件紀錄。
在應用程式的記憶體中,從第一個事件開始,一步步地「重播」這些事件,就像快進電影一樣。
所有事件重播完畢後,就得到了該投資組合在特定時間點的精確狀態,並返回給請求方。

AWS 服務架構組合

graph TD
    subgraph "事件溯源核心"
        A[客戶端請求] --> B[應用服務層]
        B --> C{建立訂單事件}
        C --> D["DynamoDB 事件表<br/>(不可變事件日誌)"]
        D --> E[DynamoDB Streams<br/>自動捕獲變更]
    end

    subgraph "事件分發層"
        E --> F["Lambda 事件分發器<br/>(Event Dispatcher)"]
        F --> G["EventBridge 事件匯流排<br/>(智能路由中心)"]
        G --> H{事件規則引擎<br/>路由決策}
    end

    subgraph "CQRS 查詢端更新"
        H -->|OrderCreated 事件| I["SQS FIFO 隊列<br/>(可靠順序處理)"]
        I --> J["Lambda 查詢端更新器"]
        J --> K["RDS 查詢資料庫<br/>(反正規化視圖)"]
    end

    subgraph "即時分析管道"
        H -->|事件流| L["Kinesis Data Firehose<br/>(批量處理)"]
        L --> M["S3 Data Lake<br/>(事件歷史存檔)"]
        M --> N["Athena / QuickSight<br/>(大數據分析)"]
    end

    subgraph "複雜工作流編排"
        H -->|業務事件| O["Step Functions<br/>(狀態機編排)"]
        O --> P1["扣款服務<br/>(Payment)"]
        O --> P2["倉庫通知<br/>(Warehouse)"]
        O --> P3["物流安排<br/>(Logistics)"]
        P1 --> P2
        P2 --> P3
    end

    subgraph "即時通知系統"
        H -->|用戶相關事件| Q["SNS 通知服務"]
        Q --> R1["Email 通知"]
        Q --> R2["SMS 通知"]
        Q --> R3["Push 通知"]
        Q --> R4["Webhook 通知"]
    end

    subgraph "監控與可觀測性"
        F --> S[CloudWatch Logs]
        G --> S
        I --> S
        L --> S
        O --> S
        Q --> S

        S --> T[CloudWatch 儀表板]
        S --> U[X-Ray 分散式追蹤]
        S --> V[CloudWatch 告警]
    end

    %% 樣式定義
    classDef eventStore fill:#e1f5fe
    classDef eventBridge fill:#f3e5f5
    classDef consumer fill:#e8f5e8
    classDef monitoring fill:#fff3e0

    class D,E eventStore
    class F,G,H eventBridge
    class I,J,K,L,M,N,O,P1,P2,P3,Q,R1,R2,R3,R4 consumer
    class S,T,U,V monitoring
class PortfolioEventStore:
    """投資組合的事件溯源實現"""

    def __init__(self):
        self.event_store = boto3.client('dynamodb')
        self.event_stream = boto3.client('kinesis')

    async def store_trade_event(self, portfolio_id, event):
        """儲存交易事件到 DynamoDB"""

        event_record = {
            'portfolio_id': {'S': portfolio_id},
            'event_id': {'S': str(uuid.uuid4())},
            'event_type': {'S': event.type},
            'event_data': {'S': json.dumps(event.data)},
            'timestamp': {'N': str(int(time.time() * 1000))},
            'sequence_number': {'N': str(await self.get_next_sequence(portfolio_id))}
        }

        # 強一致性寫入
        await self.event_store.put_item(
            TableName='PortfolioEvents',
            Item=event_record,
            ConditionExpression='attribute_not_exists(event_id)'
        )

        # 異步發送到 Kinesis 供其他服務消費
        await self.event_stream.put_record(
            StreamName='portfolio-events',
            Data=json.dumps(event_record),
            PartitionKey=portfolio_id
        )

    async def rebuild_portfolio_state(self, portfolio_id, as_of_time=None):
        """從事件重建組合狀態"""

        query_params = {
            'TableName': 'PortfolioEvents',
            'KeyConditionExpression': 'portfolio_id = :pid',
            'ExpressionAttributeValues': {':pid': {'S': portfolio_id}},
            'ScanIndexForward': True  # 按時間順序
        }

        if as_of_time:
            query_params['FilterExpression'] = 'timestamp <= :time'
            query_params['ExpressionAttributeValues'][':time'] = {'N': str(as_of_time)}

        events = await self.event_store.query(**query_params)

        # 重播事件重建狀態
        portfolio_state = PortfolioState()
        for event_item in events['Items']:
            event = self.deserialize_event(event_item)
            portfolio_state.apply_event(event)

        return portfolio_state

我們來看看一個情境題

以股票代碼的EDA實現為例,我需要針對某一個用戶建立他的某一支股票交易表嗎?

答案是:絕對不要。

為每個用戶的每支股票都建立一個資料表,是一種常見的設計誤區,會導致「資料表爆炸」的災難。一個活躍的用戶可能交易上百支股票,一萬個用戶就可能產生數十萬甚至上百萬個資料表,這在管理、維護和查詢上都是不可行的。

正確的建表思路:以「事件流」為核心

在 EDA 中,我們不應該思考「該建什麼表來存狀態」,而應該思考 「該為哪個實體建立事件流」

在股票交易的場景中,最核心的業務實體(也就是 DDD 中的「聚合根」)是 「用戶的投資組合」或「交易帳戶」 ,而不是單一的股票。實際上,一個用戶的所有交易行為(conduct),都是圍繞著他的某個投資組合(requirement)發生的。

因此,我們只需要一張事件表,我稱之為 TradingEvents。這張表記錄了所有用戶、所有投資組合的所有交易事件。

理想的 TradingEvents 資料表結構 (以 DynamoDB 為例)

欄位名稱 (Attribute) 作用 範例 備註
portfolio_id 分區鍵 (Partition Key) user123-portfolio-A 核心設計:所有屬於同一個投資組合的事件都會被物理地存放在一起,極大化查詢效率。
sequence_number 排序鍵 (Sort Key) 1, 2, 3, ... 核心設計:保證一個投資組合內的所有事件都有嚴格的先後順序,這是重播歷史、重建狀態的基礎。
event_id 事件唯一標識 uuid-v4-string 用於確保事件的唯一性,防止重複處理。
event_type 事件類型 STOCK_BOUGHT, STOCK_SOLD 描述「發生了什麼」,決定了業務邏輯如何處理這個事件。
event_payload 事件內容 (JSON) {"symbol": "AAPL", "quantity": 10, "price": 175.50} 記錄事件的具體細節。
timestamp 事件發生時間 2025-09-12T10:00:00Z 記錄真實世界的業務時間,用於審計和分析。
sequenceDiagram
    participant User
    participant Application
    participant EventStore as TradingEvents (DynamoDB)
    participant InMemoryState as In-Memory State

    Note over User,InMemoryState: Event Sourcing 實際運作流程


    Note over User,EventStore: 階段一:事件記錄
    User->>+Application: 1. 存入資金 $10,000
    Application->>+EventStore: 儲存事件 1: CASH_DEPOSITED
    EventStore-->>-Application: 成功
    Application-->>-User: 完成

    User->>+Application: 2. 購買 10 股 AAPL
    Application->>+EventStore: 儲存事件 2: STOCK_BOUGHT (AAPL)
    EventStore-->>-Application: 成功
    Application-->>-User: 完成

    User->>+Application: 3. 購買 5 股 NVDA
    Application->>+EventStore: 儲存事件 3: STOCK_BOUGHT (NVDA)
    EventStore-->>-Application: 成功
    Application-->>-User: 完成

    User->>+Application: 4. 賣出 3 股 AAPL
    Application->>+EventStore: 儲存事件 4: STOCK_SOLD (AAPL)
    EventStore-->>-Application: 成功
    Application-->>-User: 完成



    Note over User,InMemoryState: 階段二:狀態重建 (回答「用戶現在持有多少 AAPL?」)
    User->>+Application: 請求查詢投資組合狀態
    Application->>+EventStore: 查詢 portfolio_id='user123-portfolio-A' 的所有事件
    EventStore-->>-Application: 返回事件 1, 2, 3, 4

    Application->>+InMemoryState: 初始化 state = { cash: 0, holdings: {} }
    InMemoryState-->>-Application:

    Application->>+InMemoryState: 重播事件 1 (存入 $10000)
    Note right of InMemoryState: state.cash = 10000
    InMemoryState-->>-Application:

    Application->>+InMemoryState: 重播事件 2 (買入 AAPL)
    Note right of InMemoryState: state.cash = 8250<br/>state.holdings.AAPL = 10
    InMemoryState-->>-Application:

    Application->>+InMemoryState: 重播事件 3 (買入 NVDA)
    Note right of InMemoryState: state.cash = 3500<br/>state.holdings.NVDA = 5
    InMemoryState-->>-Application:

    Application->>+InMemoryState: 重播事件 4 (賣出 AAPL)
    Note right of InMemoryState: state.cash = 4040<br/>state.holdings.AAPL = 7
    InMemoryState-->>-Application:

    Application-->>-User: 返回最終狀態:<br/>{ cash: 4040, holdings: { 'AAPL': 7, 'NVDA': 5 } }


上一篇
Day 11-1 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(一) - 核心設計策略AWS實戰解析:主檔管理策略
下一篇
Day 11-3 | 資料庫設計哲學:需求解析、技術選型與 Schema 設計策略(三) - 核心設計策略AWS實戰解析:CQRS命令查詢責任分離 - 以世界盃足球賽為例
系列文
AWS架構師的自我修養:30天雲端系統思維實戰指南28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言