洛基一進茶室,就看見白板上的流程圖:
「前面二十六天,」大師說,「我們都是主動操作資料庫——寫入、查詢、更新、刪除。」
他指著流程圖:「但真實系統中,很多事情應該『自動發生』:」
新活動建立 → 自動發送通知給訂閱者
活動狀態變更 → 自動同步外部日曆
參與者註冊 → 自動更新統計數據
TTL 刪除資料 → 自動清理相關資源
洛基思考這些場景:「以前我都是寫入後,再手動呼叫發送通知的函數。現在是說...不需要手動呼叫?」
「對,」大師說,「資料一旦寫入,後續動作自動觸發。寫入程式不需要知道『接下來會發生什麼』。」
洛基眼睛一亮:「解耦!寫入活動的程式,不用管通知、不用管同步、不用管統計。」
「這就是事件驅動架構,」大師說,「DynamoDB Streams 是實現它的工具。」
「具體是怎麼運作的?」洛基問。
大師在白板上展開流程:
資料變更流程:
1. 應用寫入 DynamoDB
docClient.put({ ... })
2. DynamoDB Streams 捕捉變更
├─ 記錄事件類型:INSERT / MODIFY / REMOVE
├─ 保存變更資料:舊值 / 新值
└─ 順序保證:同一 PK 的變更順序一致
3. Lambda 自動觸發
├─ 批次接收 Stream Records
├─ 處理業務邏輯
└─ 失敗自動重試
4. 資料保留 24 小時
├─ Lambda 有 24 小時處理時間
└─ 超過 24 小時的資料會被丟棄
洛基盯著第 4 點:「24 小時?如果 Lambda 處理很慢,累積太多事件怎麼辦?」
「Streams 會累積,但有上限,」大師說,「超過 24 小時的事件會被丟棄。所以 Lambda 必須跟得上寫入速度。」
「那如果某個事件處理失敗呢?」
「AWS 會自動重試,」大師說,「但這也會帶來問題——我們稍後會談冪等性設計。先看 Streams 能捕捉什麼資料。」
「Streams 會記錄什麼資料?」洛基問,「整個項目嗎?」
「這取決於設定,」大師說,「啟用 Streams 時,有四種 StreamViewType 可以選。」
他在白板上列出:
StreamViewType | 捕捉內容 | 適用場景 | 優缺點 |
---|---|---|---|
KEYS_ONLY | 只有 PK, SK | 需要回查主表拿完整資料 | 資料量最小,但需額外查詢 |
NEW_IMAGE | 變更後的完整項目 | 同步到搜尋引擎(只需最新狀態) | 知道新值,不知舊值 |
OLD_IMAGE | 變更前的完整項目 | 審計日誌、TTL 刪除記錄 | 知道舊值,不知新值 |
NEW_AND_OLD_IMAGES | 變更前後完整項目 | 比對變更(DRAFT → ACTIVE) | 最完整,但資料量最大 ✅ 最常用 |
洛基思考:「如果只記錄 KEYS_ONLY
,Lambda 還要再查一次主表才能拿到完整資料?」
「對,」大師說,「所以通常選 NEW_AND_OLD_IMAGES
——知道『從什麼變成什麼』最有價值。例如活動狀態從 DRAFT 變成 ACTIVE,你需要知道『之前是 DRAFT』才能判斷是否要執行發布流程。」
洛基點頭,理解了為什麼需要新舊值。
「Lambda 函數會收到什麼格式的資料?」洛基問。
「Stream Record,」大師說,「讓我展示實際結構。」
// Lambda 函數收到的 event 結構
{
"Records": [
{
"eventID": "1",
"eventName": "INSERT", // INSERT, MODIFY, REMOVE
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"PK": { "S": "EVENT#SY210-04-20-001" },
"SK": { "S": "METADATA" }
},
"NewImage": {
"PK": { "S": "EVENT#SY210-04-20-001" },
"SK": { "S": "METADATA" },
"eventName": { "S": "火星科學大會" },
"status": { "S": "ACTIVE" },
"registrations": { "N": "150" }
},
"OldImage": {
// 變更前的值(MODIFY 事件才有)
"status": { "S": "DRAFT" },
"registrations": { "N": "100" }
},
"SequenceNumber": "111",
"SizeBytes": 260
},
"userIdentity": {
"type": "Service", // TTL 刪除時會是 Service
"principalId": "dynamodb.amazonaws.com"
}
}
]
}
洛基看著這個結構,發現熟悉的格式:「NewImage
和 OldImage
裡面,{ "S": "ACTIVE" }
這種格式...」
「對,第 11 天學過的 DynamoDB 原生格式,」大師說,「需要用 unmarshall
轉換。」
「還有 userIdentity
,」洛基指著最後一段,「type: "Service"
是什麼意思?」
「這能區分是使用者操作還是系統操作,」大師解釋,「TTL 自動刪除時,userIdentity.type
會是 Service
。這樣你就知道是 TTL 刪的,而不是使用者手動刪的。」
洛基理解了:「所以可以根據刪除原因做不同處理。」
「現在寫一個最基本的 Lambda Handler,」大師說。
const { unmarshall } = require("@aws-sdk/util-dynamodb");
exports.handler = async (event) => {
console.log(`收到 ${event.Records.length} 筆 Stream 事件`);
const errors = [];
for (const record of event.Records) {
try {
const { eventName, dynamodb } = record;
// 解析 DynamoDB 格式
const newItem = dynamodb.NewImage ? unmarshall(dynamodb.NewImage) : null;
const oldItem = dynamodb.OldImage ? unmarshall(dynamodb.OldImage) : null;
// 根據事件類型處理
switch (eventName) {
case "INSERT":
await handleInsert(newItem);
break;
case "MODIFY":
await handleModify(oldItem, newItem);
break;
case "REMOVE":
await handleRemove(oldItem, record);
break;
}
} catch (error) {
console.error("處理記錄時發生錯誤:", error);
errors.push({ record, error: error.message });
}
}
if (errors.length > 0) {
console.error(`${errors.length} 筆記錄處理失敗`);
// 部分失敗不拋錯,避免整批重試
// 失敗項目應該送到 DLQ 或記錄到監控系統
}
return {
statusCode: 200,
processed: event.Records.length - errors.length,
failed: errors.length,
};
};
「這就是最基本的結構,」大師說,「根據 eventName
判斷是新增、修改還是刪除,然後分別處理。」
洛基點頭:「INSERT、MODIFY、REMOVE 三種事件類型。」
「對,」大師說,「現在看實際案例——如何用 Streams 實現自動通知。」
大師擦掉白板,重新畫出第一個場景:
「星際活動系統中,使用者可以訂閱特定地點。當該地點有新活動時,自動通知所有訂閱者。」
洛基馬上想到傳統做法:「以前是寫 createEvent()
函數,裡面先 put()
寫入活動,再查詢訂閱者,然後呼叫發送通知。」
「對,但這有什麼問題?」大師問。
洛基思考:「createEvent()
函數要處理太多事情——寫入、查詢訂閱者、發送通知。而且如果通知失敗,整個建立活動的操作也會失敗。」
「正確,」大師說,「用 Streams 可以解耦。」
// 場景:新活動 INSERT 時自動發送通知
async function handleInsert(event) {
// 判斷是否為活動建立
if (!event.PK.startsWith("EVENT#")) return;
console.log("新活動建立:", event.eventName);
// 1. 查詢訂閱者
const location = event.location; // 例如 'MARS'
const subscribers = await getLocationSubscribers(location);
console.log(`找到 ${subscribers.length} 位訂閱者`);
// 2. 批次建立通知項目
const notifications = subscribers.map((user) => ({
PK: `USER#${user.id}`,
SK: `NOTIF#${Date.now()}#${generateId()}`,
type: "NEW_EVENT",
message: `新活動:${event.eventName} 於 ${event.date}`,
eventId: event.PK,
createdAt: Math.floor(Date.now() / 1000),
read: false,
}));
// 3. 批次寫入通知
await batchWriteNotifications(notifications);
console.log(`已發送 ${notifications.length} 則通知`);
}
洛基看著程式碼,眼睛一亮:「所以現在 createEvent()
只需要呼叫 put()
寫入資料,完全不管通知的事?」
「對,」大師說,「Streams 自動捕捉 INSERT 事件,Lambda 處理通知邏輯。寫入和通知完全解耦。」
「而且,」洛基補充,「如果通知失敗,也不會影響活動建立。Lambda 會自動重試。」
大師點頭:「這就是事件驅動的優勢。」
洛基翻到筆記本的下一頁:「那 MODIFY 事件呢?什麼時候會用到?」
「很多場景,」大師說,「例如活動狀態變更。從 DRAFT 變成 ACTIVE 時,要執行一系列發布流程。」
洛基想了想:「如果不用 Streams,我需要在 updateStatus()
函數裡判斷『如果狀態變成 ACTIVE,就執行 A、B、C...』對吧?」
「對,而且會越來越複雜,」大師說,「Streams 可以把這些邏輯集中管理。」
// 場景:活動狀態從 DRAFT → ACTIVE 時執行發布流程
async function handleModify(oldItem, newItem) {
// 狀態變更:DRAFT → ACTIVE
if (oldItem.status === "DRAFT" && newItem.status === "ACTIVE") {
await Promise.all([
syncToExternalCalendar(newItem),
incrementCounter("ACTIVE_EVENTS"),
triggerRecommendation(newItem),
]);
}
// 狀態變更:任何狀態 → CANCELLED
if (newItem.status === "CANCELLED") {
await processRefunds(newItem);
await notifyParticipants(newItem, "CANCELLED");
}
// 報名人數變更
if (oldItem.registrations !== newItem.registrations) {
await updateDashboard({
eventId: newItem.PK,
registrations: newItem.registrations,
});
}
}
洛基看著程式碼:「所以一次 update
操作,可能同時觸發狀態發布、退款、報名統計...」
「對,」大師說,「而且全部集中在 Lambda 裡管理。updateStatus()
函數只需要寫入資料,不用知道後續會觸發什麼。」
洛基點頭:「解耦做得很徹底。」
「還記得昨天學的 TTL 嗎?」大師問。
洛基翻回筆記本:「資料過期後自動刪除,用來清理 session、通知、過期活動。」
「TTL 刪除也會觸發 REMOVE 事件,」大師說,「而且能用 userIdentity
判斷是 TTL 刪的還是使用者手動刪的。」
洛基突然想到:「所以可以在資料被 TTL 刪除前,先歸檔到 S3?」
「正是,」大師說,「這就是 Streams 配合 TTL 的常見模式。」
// 場景:TTL 刪除時清理相關資源
async function handleRemove(oldItem, record) {
// 檢查是否為 TTL 刪除
const isTTL =
record.userIdentity?.type === "Service" &&
record.userIdentity?.principalId === "dynamodb.amazonaws.com";
if (isTTL) {
console.log("TTL 刪除:", oldItem.PK);
// Session 過期
if (oldItem.PK.startsWith("SESSION#")) {
await clearUserCache(oldItem.userId);
console.log("已清理使用者快取");
}
// 活動過期(30 天後)
if (oldItem.PK.startsWith("EVENT#")) {
// 歸檔到 S3
await archiveToS3({
bucket: "archived-events",
key: `${oldItem.PK.replace("#", "/")}.json`,
data: oldItem,
});
console.log("已歸檔到 S3:", oldItem.eventName);
}
// 記錄審計日誌
await logAudit({
action: "TTL_DELETE",
item: oldItem.PK,
timestamp: Math.floor(Date.now() / 1000),
});
} else {
// 手動刪除
console.log("使用者刪除:", oldItem.PK);
await logAudit({
action: "MANUAL_DELETE",
item: oldItem.PK,
timestamp: Math.floor(Date.now() / 1000),
});
}
}
洛基興奮:「所以昨天設定 TTL 時,就已經為今天的 Streams 處理做準備了?」
「這就是系統設計的連貫性,」大師說。
大師給出最後一個常見場景:
「DynamoDB 擅長 Key-Value 查詢,但不擅長全文搜尋。可以用 Streams 同步到 Elasticsearch。」
// 場景:將活動資料同步到 Elasticsearch 供全文搜尋
async function syncToElasticsearch(event, newItem) {
if (!newItem.PK.startsWith("EVENT#")) return;
const esDoc = {
id: newItem.PK,
eventName: newItem.eventName,
searchText: `${newItem.eventName} ${newItem.topic} ${newItem.description}`,
};
await esClient.index({
index: "intergalactic-events",
id: esDoc.id,
body: esDoc,
});
}
「這樣,」大師說,「使用者可以在 Elasticsearch 做全文搜尋,資料由 Streams 自動保持同步。」
大師的表情嚴肅起來:「Streams 很強大,但也有陷阱——錯誤處理。」
他在白板上寫下場景:
Lambda 處理失敗會怎樣?
1. AWS 自動重試
2. 最多重試次數可設定(例如 3 次)
3. 重試失敗後,可送到 Dead Letter Queue
4. 同一個 Shard 的事件會阻塞(順序保證)
問題:如果同一個事件被處理兩次呢?
洛基想到:「可能會重複發送通知...」
「這就是為什麼需要冪等性設計,」大師說。
// 冪等性設計:確保重複處理不會造成問題
async function handleInsert(event) {
const notificationId = `${event.PK}#${event.SK}#NEW_EVENT`;
// 檢查是否已處理
const existing = await docClient.get({
TableName: "ProcessedEvents",
Key: { id: notificationId },
});
if (existing.Item) {
console.log("已處理過,跳過");
return; // 冪等:重複執行無副作用
}
// 發送通知
await sendNotification({
id: notificationId,
message: `新活動:${event.eventName}`,
});
// 標記已處理(設定 7 天 TTL)
await docClient.put({
TableName: "ProcessedEvents",
Item: {
id: notificationId,
processedAt: Date.now(),
ttl: Math.floor(Date.now() / 1000) + 7 * 24 * 60 * 60,
},
});
}
洛基理解了:「所以即使 Lambda 重試,也不會重複發送通知。」
「這就是生產環境必須的設計,」大師說。
大師展示 Lambda 觸發器的進階設定:
Lambda Event Source Mapping 關鍵設定:
{
batchSize: 100, // 一次最多 100 筆
batchWindow: 5, // 或等待 5 秒
parallelizationFactor: 10, // 每個 Shard 並行 10 個批次
retryAttempts: 3, // 失敗重試 3 次
onFailure: { destination: 'arn:aws:sqs:...:dlq' } // DLQ
}
「這些設定,」大師說,「直接影響處理速度和可靠性。」
設定 | 影響 |
---|---|
batchSize 大 |
吞吐量高,但失敗影響範圍大 |
batchSize 小 |
失敗影響小,但呼叫次數多(成本高) |
parallelizationFactor 高 |
處理快,但可能亂序 |
retryAttempts 高 |
可靠性高,但失敗時阻塞久 |
大師展示批次處理的建議寫法:
exports.handler = async (event) => {
console.log(`收到批次: ${event.Records.length} 筆`);
// 並行處理所有記錄
const results = await Promise.allSettled(
event.Records.map((record) => processRecord(record))
);
// 分析結果
const succeeded = results.filter((r) => r.status === "fulfilled");
const failed = results.filter((r) => r.status === "rejected");
console.log(`成功: ${succeeded.length}, 失敗: ${failed.length}`);
// 記錄失敗項目
if (failed.length > 0) {
failed.forEach((result, index) => {
console.error(`記錄 ${index} 失敗:`, result.reason);
});
}
// 策略:部分失敗不拋錯,避免整批重試
// 將失敗項目送到 DLQ 供後續處理
if (failed.length > 0) {
await sendToDLQ(failed.map((r, i) => event.Records[i]));
}
return {
statusCode: 200,
processed: succeeded.length,
failed: failed.length,
};
};
async function processRecord(record) {
const { eventName, dynamodb } = record;
const item = unmarshall(dynamodb.NewImage || dynamodb.OldImage);
// 處理邏輯
switch (eventName) {
case "INSERT":
return await handleInsert(item);
case "MODIFY":
return await handleModify(
unmarshall(dynamodb.OldImage),
unmarshall(dynamodb.NewImage)
);
case "REMOVE":
return await handleRemove(item, record);
}
}
大師在白板上列出重要限制:
DynamoDB Streams 的限制:
1. 資料保留:24 小時
- Lambda 必須在 24 小時內處理完
- 超過 24 小時的事件會被丟棄
2. 順序保證:同一 PK 的事件順序一致
- 不同 PK 的事件可能亂序
- 設計時要考慮順序依賴
3. 讀取容量:
- Streams 的讀取不消耗主表 RCU
- 但有每秒讀取次數限制
4. Lambda 呼叫成本:
- 每次觸發都計費
- 批次大小影響成本
5. 重複事件:
- 極少數情況下可能重複
- 必須設計冪等性
洛基問:「什麼時候不該用 Streams?」
大師回答:
不適合 Streams 的場景:
1. 即時性要求極高(< 100ms)
- Streams 有延遲(通常 < 1 秒,但不保證)
2. 需要嚴格的事務一致性
- Streams 是最終一致性
3. 事件量極大(> 10,000 TPS)
- Lambda 並行度有限制
- 考慮 Kinesis Data Streams
4. 複雜的事件編排
- 多步驟工作流用 Step Functions
洛基看著白板上密密麻麻的程式碼和架構圖。
從手動操作資料庫,到資料變更自動觸發業務邏輯——這是一個思維上的躍升。
他突然想到一個問題:「如果要做複雜的資料遷移,例如把 100 萬筆舊資料轉換格式,Streams 可以做嗎?」
大師笑了:「好問題。Streams 適合處理增量變更,但不適合大規模歷史資料處理。那種場景需要用 Scan + 批次寫入,或是 DynamoDB 的備份與還原、Export to S3 等工具。」
洛基點點頭。事件驅動架構讓系統更靈活,但也需要更多工具來應對不同場景。
系統設計,永遠是在不同工具中選擇最適合的組合。
時間設定說明:故事中使用星際曆(SY210 = 西元 2210 年),程式碼範例為確保正確執行,使用對應的西元年份。