再次回到我們的圖書館比喻。
傳統的資料庫設計,就像一個只關心「現在」的圖書館員。
我們問他:「《夜間飛行》在哪裡?」他會直接告訴我們:「在 A-7 書架上。」但如果我們追問:「那昨天下午三點,這本書在哪裡?」
他會一臉茫然,因為他只記錄書的最終狀態,過程中的變化都遺失了。
//TODO:《夜間飛行》是一本我非常喜歡的書,非常推薦大家看看:))
概念核心:
事件驅動的哲學:記錄「發生了什麼」,而非「變成了什麼」,事件驅動架構,尤其是事件溯源,採用了一種截然不同的哲學。它像一位一絲不苟的歷史學家,他維護的不是一份書單,而是一本永不修改的流水帳 (Immutable Ledger)。
程式語言有一個概念來管理這樣子的抽象具體 Link (Node-List)。所有的事件都必然有其連接的前後事件,透過重播歷程,來重建任何一個時間點的狀態
所以,回到我們的圖書館吧檯,當我們檢視書籍的歷程記錄事件列時,我們會看到:
09:15
:會員 Alice 在 圖書館 C 歸還了《夜間飛行》。11:30
:管理員將《夜間飛行》從 圖書館 C 移至 圖書館 B。15:10
:工讀生將《夜間飛行》從還書車上架到 A-7。這樣子的事件流。
看到了嗎?我們不儲存書的「位置」(狀態),我們只儲存書的每一次「移動」(事件)。
傳統 CRUD 是「破壞性」的,UPDATE users SET status = 'inactive'
時,我們就永遠失去了這個用戶之前的狀態 - 資訊在更新中被破壞了。
事件溯源則是「建構性」記錄下 UserDeactivated
事件。所有的歷史資訊節點都被保留下來
快速摘要
應用場景:
OrderCreated
事件後,它可以完全不知道下游有哪些服務需要這個資訊。 「庫存服務」 可以訂閱此事件來扣減庫存,「物流服務」 可以訂閱它來準備出貨,「通知服務」 可以訂閱它來發送確認郵件。服務之間完全解耦,可以獨立開發、部署和擴展。優勢:
然而,這種「把任務寫在白板上就走人」的溝通方式,也帶來了它獨有的複雜性與挑戰。
總結來說,EDA 提供了極致的靈活性和擴展性,但代價是增加了系統的整體複雜度和對「最終一致性」的容忍。選擇它,意味著我們必須準備好用管理一個複雜的「異步協作系統」來換取單個服務的「簡單和獨立」。
graph TD
subgraph "寫入流程 store_trade_event"
A[應用程式/服務] --> B{呼叫 store_trade_event}
B --> C[1. 建立交易事件紀錄]
C --> D["2. 寫入 DynamoDB<br/>(PortfolioEvents 表)"]
D --> E["3. 發布事件到 Kinesis<br/>(portfolio-events 流)"]
E --> F["下游消費者<br/>(如 Lambda, 其他微服務)"]
F --> G["處理事件<br/>(如更新儀表板, 風險計算)"]
D -- 強一致性寫入 --> C
end
subgraph "讀取/重建流程 rebuild_portfolio_state"
H[應用程式/服務] --> I{呼叫 rebuild_portfolio_state}
I --> J["1. 查詢 DynamoDB<br/>(PortfolioEvents 表)"]
J --> K["2. 獲取特定 portfolio_id 的<br/>所有歷史事件"]
K --> L["3. 在記憶體中<br/>按順序重播事件"]
L --> M["4. 得到重建後的<br/>投資組合狀態"]
M --> N[返回狀態給應用程式]
end
寫入流程 (左側):
當一個交易行為發生時,應用程式會呼叫 store_trade_event
。
該函數首先將這個交易事件寫入 DynamoDB 的 PortfolioEvents 表中。這是我們的「事件日誌」,是不可變的、永久的紀錄。
寫入成功後,同一個事件會被發布到 Kinesis 數據流中。
其他關心此事件的下游服務(例如:更新報表的服務、計算風險的服務)會從 Kinesis 訂閱並接收到這個事件,然後執行各自的業務邏輯。這個過程是異步且解耦的。
讀取/重建流程 (右側):
當需要查詢某個投資組合的「當前狀態」時,應用程式會呼叫 rebuild_portfolio_state
。
系統會去查詢 DynamoDB,拉取該投資組合的所有歷史事件紀錄。
在應用程式的記憶體中,從第一個事件開始,一步步地「重播」這些事件,就像快進電影一樣。
所有事件重播完畢後,就得到了該投資組合在特定時間點的精確狀態,並返回給請求方。
AWS 服務架構組合
graph TD
subgraph "事件溯源核心"
A[客戶端請求] --> B[應用服務層]
B --> C{建立訂單事件}
C --> D["DynamoDB 事件表<br/>(不可變事件日誌)"]
D --> E[DynamoDB Streams<br/>自動捕獲變更]
end
subgraph "事件分發層"
E --> F["Lambda 事件分發器<br/>(Event Dispatcher)"]
F --> G["EventBridge 事件匯流排<br/>(智能路由中心)"]
G --> H{事件規則引擎<br/>路由決策}
end
subgraph "CQRS 查詢端更新"
H -->|OrderCreated 事件| I["SQS FIFO 隊列<br/>(可靠順序處理)"]
I --> J["Lambda 查詢端更新器"]
J --> K["RDS 查詢資料庫<br/>(反正規化視圖)"]
end
subgraph "即時分析管道"
H -->|事件流| L["Kinesis Data Firehose<br/>(批量處理)"]
L --> M["S3 Data Lake<br/>(事件歷史存檔)"]
M --> N["Athena / QuickSight<br/>(大數據分析)"]
end
subgraph "複雜工作流編排"
H -->|業務事件| O["Step Functions<br/>(狀態機編排)"]
O --> P1["扣款服務<br/>(Payment)"]
O --> P2["倉庫通知<br/>(Warehouse)"]
O --> P3["物流安排<br/>(Logistics)"]
P1 --> P2
P2 --> P3
end
subgraph "即時通知系統"
H -->|用戶相關事件| Q["SNS 通知服務"]
Q --> R1["Email 通知"]
Q --> R2["SMS 通知"]
Q --> R3["Push 通知"]
Q --> R4["Webhook 通知"]
end
subgraph "監控與可觀測性"
F --> S[CloudWatch Logs]
G --> S
I --> S
L --> S
O --> S
Q --> S
S --> T[CloudWatch 儀表板]
S --> U[X-Ray 分散式追蹤]
S --> V[CloudWatch 告警]
end
%% 樣式定義
classDef eventStore fill:#e1f5fe
classDef eventBridge fill:#f3e5f5
classDef consumer fill:#e8f5e8
classDef monitoring fill:#fff3e0
class D,E eventStore
class F,G,H eventBridge
class I,J,K,L,M,N,O,P1,P2,P3,Q,R1,R2,R3,R4 consumer
class S,T,U,V monitoring
class PortfolioEventStore:
"""投資組合的事件溯源實現"""
def __init__(self):
self.event_store = boto3.client('dynamodb')
self.event_stream = boto3.client('kinesis')
async def store_trade_event(self, portfolio_id, event):
"""儲存交易事件到 DynamoDB"""
event_record = {
'portfolio_id': {'S': portfolio_id},
'event_id': {'S': str(uuid.uuid4())},
'event_type': {'S': event.type},
'event_data': {'S': json.dumps(event.data)},
'timestamp': {'N': str(int(time.time() * 1000))},
'sequence_number': {'N': str(await self.get_next_sequence(portfolio_id))}
}
# 強一致性寫入
await self.event_store.put_item(
TableName='PortfolioEvents',
Item=event_record,
ConditionExpression='attribute_not_exists(event_id)'
)
# 異步發送到 Kinesis 供其他服務消費
await self.event_stream.put_record(
StreamName='portfolio-events',
Data=json.dumps(event_record),
PartitionKey=portfolio_id
)
async def rebuild_portfolio_state(self, portfolio_id, as_of_time=None):
"""從事件重建組合狀態"""
query_params = {
'TableName': 'PortfolioEvents',
'KeyConditionExpression': 'portfolio_id = :pid',
'ExpressionAttributeValues': {':pid': {'S': portfolio_id}},
'ScanIndexForward': True # 按時間順序
}
if as_of_time:
query_params['FilterExpression'] = 'timestamp <= :time'
query_params['ExpressionAttributeValues'][':time'] = {'N': str(as_of_time)}
events = await self.event_store.query(**query_params)
# 重播事件重建狀態
portfolio_state = PortfolioState()
for event_item in events['Items']:
event = self.deserialize_event(event_item)
portfolio_state.apply_event(event)
return portfolio_state
我們來看看一個情境題
以股票代碼的EDA實現為例,我需要針對某一個用戶建立他的某一支股票交易表嗎?
答案是:絕對不要。
為每個用戶的每支股票都建立一個資料表,是一種常見的設計誤區,會導致「資料表爆炸」的災難。一個活躍的用戶可能交易上百支股票,一萬個用戶就可能產生數十萬甚至上百萬個資料表,這在管理、維護和查詢上都是不可行的。
正確的建表思路:以「事件流」為核心
在 EDA 中,我們不應該思考「該建什麼表來存狀態」,而應該思考 「該為哪個實體建立事件流」 。
在股票交易的場景中,最核心的業務實體(也就是 DDD 中的「聚合根」)是 「用戶的投資組合」或「交易帳戶」 ,而不是單一的股票。實際上,一個用戶的所有交易行為(conduct),都是圍繞著他的某個投資組合(requirement)發生的。
因此,我們只需要一張事件表,我稱之為 TradingEvents。這張表記錄了所有用戶、所有投資組合的所有交易事件。
理想的 TradingEvents
資料表結構 (以 DynamoDB 為例)
欄位名稱 (Attribute) | 作用 | 範例 | 備註 |
---|---|---|---|
portfolio_id | 分區鍵 (Partition Key) | user123-portfolio-A | 核心設計:所有屬於同一個投資組合的事件都會被物理地存放在一起,極大化查詢效率。 |
sequence_number | 排序鍵 (Sort Key) | 1, 2, 3, ... | 核心設計:保證一個投資組合內的所有事件都有嚴格的先後順序,這是重播歷史、重建狀態的基礎。 |
event_id | 事件唯一標識 | uuid-v4-string | 用於確保事件的唯一性,防止重複處理。 |
event_type | 事件類型 | STOCK_BOUGHT, STOCK_SOLD | 描述「發生了什麼」,決定了業務邏輯如何處理這個事件。 |
event_payload | 事件內容 (JSON) | {"symbol": "AAPL", "quantity": 10, "price": 175.50} | 記錄事件的具體細節。 |
timestamp | 事件發生時間 | 2025-09-12T10:00:00Z | 記錄真實世界的業務時間,用於審計和分析。 |
sequenceDiagram
participant User
participant Application
participant EventStore as TradingEvents (DynamoDB)
participant InMemoryState as In-Memory State
Note over User,InMemoryState: Event Sourcing 實際運作流程
Note over User,EventStore: 階段一:事件記錄
User->>+Application: 1. 存入資金 $10,000
Application->>+EventStore: 儲存事件 1: CASH_DEPOSITED
EventStore-->>-Application: 成功
Application-->>-User: 完成
User->>+Application: 2. 購買 10 股 AAPL
Application->>+EventStore: 儲存事件 2: STOCK_BOUGHT (AAPL)
EventStore-->>-Application: 成功
Application-->>-User: 完成
User->>+Application: 3. 購買 5 股 NVDA
Application->>+EventStore: 儲存事件 3: STOCK_BOUGHT (NVDA)
EventStore-->>-Application: 成功
Application-->>-User: 完成
User->>+Application: 4. 賣出 3 股 AAPL
Application->>+EventStore: 儲存事件 4: STOCK_SOLD (AAPL)
EventStore-->>-Application: 成功
Application-->>-User: 完成
Note over User,InMemoryState: 階段二:狀態重建 (回答「用戶現在持有多少 AAPL?」)
User->>+Application: 請求查詢投資組合狀態
Application->>+EventStore: 查詢 portfolio_id='user123-portfolio-A' 的所有事件
EventStore-->>-Application: 返回事件 1, 2, 3, 4
Application->>+InMemoryState: 初始化 state = { cash: 0, holdings: {} }
InMemoryState-->>-Application:
Application->>+InMemoryState: 重播事件 1 (存入 $10000)
Note right of InMemoryState: state.cash = 10000
InMemoryState-->>-Application:
Application->>+InMemoryState: 重播事件 2 (買入 AAPL)
Note right of InMemoryState: state.cash = 8250<br/>state.holdings.AAPL = 10
InMemoryState-->>-Application:
Application->>+InMemoryState: 重播事件 3 (買入 NVDA)
Note right of InMemoryState: state.cash = 3500<br/>state.holdings.NVDA = 5
InMemoryState-->>-Application:
Application->>+InMemoryState: 重播事件 4 (賣出 AAPL)
Note right of InMemoryState: state.cash = 4040<br/>state.holdings.AAPL = 7
InMemoryState-->>-Application:
Application-->>-User: 返回最終狀態:<br/>{ cash: 4040, holdings: { 'AAPL': 7, 'NVDA': 5 } }