iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0

洛基一進茶室,就看見白板上的流程圖:

https://ithelp.ithome.com.tw/upload/images/20251013/20178813s44bLs4ZpV.jpg

「前面二十六天,」大師說,「我們都是主動操作資料庫——寫入、查詢、更新、刪除。」

他指著流程圖:「但真實系統中,很多事情應該『自動發生』:」

新活動建立 → 自動發送通知給訂閱者
活動狀態變更 → 自動同步外部日曆
參與者註冊 → 自動更新統計數據
TTL 刪除資料 → 自動清理相關資源

洛基思考這些場景:「以前我都是寫入後,再手動呼叫發送通知的函數。現在是說...不需要手動呼叫?」

「對,」大師說,「資料一旦寫入,後續動作自動觸發。寫入程式不需要知道『接下來會發生什麼』。」

洛基眼睛一亮:「解耦!寫入活動的程式,不用管通知、不用管同步、不用管統計。」

「這就是事件驅動架構,」大師說,「DynamoDB Streams 是實現它的工具。」


Streams 的運作原理

「具體是怎麼運作的?」洛基問。

大師在白板上展開流程:

資料變更流程:

1. 應用寫入 DynamoDB
   docClient.put({ ... })

2. DynamoDB Streams 捕捉變更
   ├─ 記錄事件類型:INSERT / MODIFY / REMOVE
   ├─ 保存變更資料:舊值 / 新值
   └─ 順序保證:同一 PK 的變更順序一致

3. Lambda 自動觸發
   ├─ 批次接收 Stream Records
   ├─ 處理業務邏輯
   └─ 失敗自動重試

4. 資料保留 24 小時
   ├─ Lambda 有 24 小時處理時間
   └─ 超過 24 小時的資料會被丟棄

洛基盯著第 4 點:「24 小時?如果 Lambda 處理很慢,累積太多事件怎麼辦?」

「Streams 會累積,但有上限,」大師說,「超過 24 小時的事件會被丟棄。所以 Lambda 必須跟得上寫入速度。」

「那如果某個事件處理失敗呢?」

「AWS 會自動重試,」大師說,「但這也會帶來問題——我們稍後會談冪等性設計。先看 Streams 能捕捉什麼資料。」


Streams 能捕捉什麼?

「Streams 會記錄什麼資料?」洛基問,「整個項目嗎?」

「這取決於設定,」大師說,「啟用 Streams 時,有四種 StreamViewType 可以選。」

他在白板上列出:

StreamViewType 捕捉內容 適用場景 優缺點
KEYS_ONLY 只有 PK, SK 需要回查主表拿完整資料 資料量最小,但需額外查詢
NEW_IMAGE 變更後的完整項目 同步到搜尋引擎(只需最新狀態) 知道新值,不知舊值
OLD_IMAGE 變更前的完整項目 審計日誌、TTL 刪除記錄 知道舊值,不知新值
NEW_AND_OLD_IMAGES 變更前後完整項目 比對變更(DRAFT → ACTIVE) 最完整,但資料量最大 ✅ 最常用

洛基思考:「如果只記錄 KEYS_ONLY,Lambda 還要再查一次主表才能拿到完整資料?」

「對,」大師說,「所以通常選 NEW_AND_OLD_IMAGES——知道『從什麼變成什麼』最有價值。例如活動狀態從 DRAFT 變成 ACTIVE,你需要知道『之前是 DRAFT』才能判斷是否要執行發布流程。」

洛基點頭,理解了為什麼需要新舊值。


Lambda 收到什麼?

「Lambda 函數會收到什麼格式的資料?」洛基問。

「Stream Record,」大師說,「讓我展示實際結構。」

// Lambda 函數收到的 event 結構

{
  "Records": [
    {
      "eventID": "1",
      "eventName": "INSERT",  // INSERT, MODIFY, REMOVE
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "Keys": {
          "PK": { "S": "EVENT#SY210-04-20-001" },
          "SK": { "S": "METADATA" }
        },
        "NewImage": {
          "PK": { "S": "EVENT#SY210-04-20-001" },
          "SK": { "S": "METADATA" },
          "eventName": { "S": "火星科學大會" },
          "status": { "S": "ACTIVE" },
          "registrations": { "N": "150" }
        },
        "OldImage": {
          // 變更前的值(MODIFY 事件才有)
          "status": { "S": "DRAFT" },
          "registrations": { "N": "100" }
        },
        "SequenceNumber": "111",
        "SizeBytes": 260
      },
      "userIdentity": {
        "type": "Service",  // TTL 刪除時會是 Service
        "principalId": "dynamodb.amazonaws.com"
      }
    }
  ]
}

洛基看著這個結構,發現熟悉的格式:「NewImageOldImage 裡面,{ "S": "ACTIVE" } 這種格式...」

「對,第 11 天學過的 DynamoDB 原生格式,」大師說,「需要用 unmarshall 轉換。」

「還有 userIdentity,」洛基指著最後一段,「type: "Service" 是什麼意思?」

「這能區分是使用者操作還是系統操作,」大師解釋,「TTL 自動刪除時,userIdentity.type 會是 Service。這樣你就知道是 TTL 刪的,而不是使用者手動刪的。」

洛基理解了:「所以可以根據刪除原因做不同處理。」


寫第一個 Stream 處理函數

「現在寫一個最基本的 Lambda Handler,」大師說。

const { unmarshall } = require("@aws-sdk/util-dynamodb");

exports.handler = async (event) => {
  console.log(`收到 ${event.Records.length} 筆 Stream 事件`);

  const errors = [];

  for (const record of event.Records) {
    try {
      const { eventName, dynamodb } = record;

      // 解析 DynamoDB 格式
      const newItem = dynamodb.NewImage ? unmarshall(dynamodb.NewImage) : null;
      const oldItem = dynamodb.OldImage ? unmarshall(dynamodb.OldImage) : null;

      // 根據事件類型處理
      switch (eventName) {
        case "INSERT":
          await handleInsert(newItem);
          break;

        case "MODIFY":
          await handleModify(oldItem, newItem);
          break;

        case "REMOVE":
          await handleRemove(oldItem, record);
          break;
      }
    } catch (error) {
      console.error("處理記錄時發生錯誤:", error);
      errors.push({ record, error: error.message });
    }
  }

  if (errors.length > 0) {
    console.error(`${errors.length} 筆記錄處理失敗`);
    // 部分失敗不拋錯,避免整批重試
    // 失敗項目應該送到 DLQ 或記錄到監控系統
  }

  return {
    statusCode: 200,
    processed: event.Records.length - errors.length,
    failed: errors.length,
  };
};

「這就是最基本的結構,」大師說,「根據 eventName 判斷是新增、修改還是刪除,然後分別處理。」

洛基點頭:「INSERT、MODIFY、REMOVE 三種事件類型。」

「對,」大師說,「現在看實際案例——如何用 Streams 實現自動通知。」


實戰案例 1:新活動自動通知

大師擦掉白板,重新畫出第一個場景:

「星際活動系統中,使用者可以訂閱特定地點。當該地點有新活動時,自動通知所有訂閱者。」

洛基馬上想到傳統做法:「以前是寫 createEvent() 函數,裡面先 put() 寫入活動,再查詢訂閱者,然後呼叫發送通知。」

「對,但這有什麼問題?」大師問。

洛基思考:「createEvent() 函數要處理太多事情——寫入、查詢訂閱者、發送通知。而且如果通知失敗,整個建立活動的操作也會失敗。」

「正確,」大師說,「用 Streams 可以解耦。」

// 場景:新活動 INSERT 時自動發送通知

async function handleInsert(event) {
  // 判斷是否為活動建立
  if (!event.PK.startsWith("EVENT#")) return;

  console.log("新活動建立:", event.eventName);

  // 1. 查詢訂閱者
  const location = event.location; // 例如 'MARS'
  const subscribers = await getLocationSubscribers(location);

  console.log(`找到 ${subscribers.length} 位訂閱者`);

  // 2. 批次建立通知項目
  const notifications = subscribers.map((user) => ({
    PK: `USER#${user.id}`,
    SK: `NOTIF#${Date.now()}#${generateId()}`,
    type: "NEW_EVENT",
    message: `新活動:${event.eventName} 於 ${event.date}`,
    eventId: event.PK,
    createdAt: Math.floor(Date.now() / 1000),
    read: false,
  }));

  // 3. 批次寫入通知
  await batchWriteNotifications(notifications);

  console.log(`已發送 ${notifications.length} 則通知`);
}

洛基看著程式碼,眼睛一亮:「所以現在 createEvent() 只需要呼叫 put() 寫入資料,完全不管通知的事?」

「對,」大師說,「Streams 自動捕捉 INSERT 事件,Lambda 處理通知邏輯。寫入和通知完全解耦。」

「而且,」洛基補充,「如果通知失敗,也不會影響活動建立。Lambda 會自動重試。」

大師點頭:「這就是事件驅動的優勢。」


實戰案例 2:狀態變更的條件處理

洛基翻到筆記本的下一頁:「那 MODIFY 事件呢?什麼時候會用到?」

「很多場景,」大師說,「例如活動狀態變更。從 DRAFT 變成 ACTIVE 時,要執行一系列發布流程。」

洛基想了想:「如果不用 Streams,我需要在 updateStatus() 函數裡判斷『如果狀態變成 ACTIVE,就執行 A、B、C...』對吧?」

「對,而且會越來越複雜,」大師說,「Streams 可以把這些邏輯集中管理。」

// 場景:活動狀態從 DRAFT → ACTIVE 時執行發布流程

async function handleModify(oldItem, newItem) {
  // 狀態變更:DRAFT → ACTIVE
  if (oldItem.status === "DRAFT" && newItem.status === "ACTIVE") {
    await Promise.all([
      syncToExternalCalendar(newItem),
      incrementCounter("ACTIVE_EVENTS"),
      triggerRecommendation(newItem),
    ]);
  }

  // 狀態變更:任何狀態 → CANCELLED
  if (newItem.status === "CANCELLED") {
    await processRefunds(newItem);
    await notifyParticipants(newItem, "CANCELLED");
  }

  // 報名人數變更
  if (oldItem.registrations !== newItem.registrations) {
    await updateDashboard({
      eventId: newItem.PK,
      registrations: newItem.registrations,
    });
  }
}

洛基看著程式碼:「所以一次 update 操作,可能同時觸發狀態發布、退款、報名統計...」

「對,」大師說,「而且全部集中在 Lambda 裡管理。updateStatus() 函數只需要寫入資料,不用知道後續會觸發什麼。」

洛基點頭:「解耦做得很徹底。」


實戰案例 3:TTL 刪除的處理

「還記得昨天學的 TTL 嗎?」大師問。

洛基翻回筆記本:「資料過期後自動刪除,用來清理 session、通知、過期活動。」

「TTL 刪除也會觸發 REMOVE 事件,」大師說,「而且能用 userIdentity 判斷是 TTL 刪的還是使用者手動刪的。」

洛基突然想到:「所以可以在資料被 TTL 刪除前,先歸檔到 S3?」

「正是,」大師說,「這就是 Streams 配合 TTL 的常見模式。」

// 場景:TTL 刪除時清理相關資源

async function handleRemove(oldItem, record) {
  // 檢查是否為 TTL 刪除
  const isTTL =
    record.userIdentity?.type === "Service" &&
    record.userIdentity?.principalId === "dynamodb.amazonaws.com";

  if (isTTL) {
    console.log("TTL 刪除:", oldItem.PK);

    // Session 過期
    if (oldItem.PK.startsWith("SESSION#")) {
      await clearUserCache(oldItem.userId);
      console.log("已清理使用者快取");
    }

    // 活動過期(30 天後)
    if (oldItem.PK.startsWith("EVENT#")) {
      // 歸檔到 S3
      await archiveToS3({
        bucket: "archived-events",
        key: `${oldItem.PK.replace("#", "/")}.json`,
        data: oldItem,
      });

      console.log("已歸檔到 S3:", oldItem.eventName);
    }

    // 記錄審計日誌
    await logAudit({
      action: "TTL_DELETE",
      item: oldItem.PK,
      timestamp: Math.floor(Date.now() / 1000),
    });
  } else {
    // 手動刪除
    console.log("使用者刪除:", oldItem.PK);
    await logAudit({
      action: "MANUAL_DELETE",
      item: oldItem.PK,
      timestamp: Math.floor(Date.now() / 1000),
    });
  }
}

洛基興奮:「所以昨天設定 TTL 時,就已經為今天的 Streams 處理做準備了?」

「這就是系統設計的連貫性,」大師說。


實戰案例 4:資料同步到 Elasticsearch

大師給出最後一個常見場景:

「DynamoDB 擅長 Key-Value 查詢,但不擅長全文搜尋。可以用 Streams 同步到 Elasticsearch。」

// 場景:將活動資料同步到 Elasticsearch 供全文搜尋

async function syncToElasticsearch(event, newItem) {
  if (!newItem.PK.startsWith("EVENT#")) return;

  const esDoc = {
    id: newItem.PK,
    eventName: newItem.eventName,
    searchText: `${newItem.eventName} ${newItem.topic} ${newItem.description}`,
  };

  await esClient.index({
    index: "intergalactic-events",
    id: esDoc.id,
    body: esDoc,
  });
}

「這樣,」大師說,「使用者可以在 Elasticsearch 做全文搜尋,資料由 Streams 自動保持同步。」


錯誤處理與重試

大師的表情嚴肅起來:「Streams 很強大,但也有陷阱——錯誤處理。」

他在白板上寫下場景:

Lambda 處理失敗會怎樣?

1. AWS 自動重試
2. 最多重試次數可設定(例如 3 次)
3. 重試失敗後,可送到 Dead Letter Queue
4. 同一個 Shard 的事件會阻塞(順序保證)

問題:如果同一個事件被處理兩次呢?

洛基想到:「可能會重複發送通知...」

「這就是為什麼需要冪等性設計,」大師說。

// 冪等性設計:確保重複處理不會造成問題

async function handleInsert(event) {
  const notificationId = `${event.PK}#${event.SK}#NEW_EVENT`;

  // 檢查是否已處理
  const existing = await docClient.get({
    TableName: "ProcessedEvents",
    Key: { id: notificationId },
  });

  if (existing.Item) {
    console.log("已處理過,跳過");
    return; // 冪等:重複執行無副作用
  }

  // 發送通知
  await sendNotification({
    id: notificationId,
    message: `新活動:${event.eventName}`,
  });

  // 標記已處理(設定 7 天 TTL)
  await docClient.put({
    TableName: "ProcessedEvents",
    Item: {
      id: notificationId,
      processedAt: Date.now(),
      ttl: Math.floor(Date.now() / 1000) + 7 * 24 * 60 * 60,
    },
  });
}

洛基理解了:「所以即使 Lambda 重試,也不會重複發送通知。」

「這就是生產環境必須的設計,」大師說。


批次處理設定

大師展示 Lambda 觸發器的進階設定:

Lambda Event Source Mapping 關鍵設定

{
  batchSize: 100,                 // 一次最多 100 筆
  batchWindow: 5,                 // 或等待 5 秒
  parallelizationFactor: 10,      // 每個 Shard 並行 10 個批次
  retryAttempts: 3,               // 失敗重試 3 次
  onFailure: { destination: 'arn:aws:sqs:...:dlq' }  // DLQ
}

「這些設定,」大師說,「直接影響處理速度和可靠性。」

設定 影響
batchSize 吞吐量高,但失敗影響範圍大
batchSize 失敗影響小,但呼叫次數多(成本高)
parallelizationFactor 處理快,但可能亂序
retryAttempts 可靠性高,但失敗時阻塞久

Lambda 批次處理最佳實踐

大師展示批次處理的建議寫法:

exports.handler = async (event) => {
  console.log(`收到批次: ${event.Records.length} 筆`);

  // 並行處理所有記錄
  const results = await Promise.allSettled(
    event.Records.map((record) => processRecord(record))
  );

  // 分析結果
  const succeeded = results.filter((r) => r.status === "fulfilled");
  const failed = results.filter((r) => r.status === "rejected");

  console.log(`成功: ${succeeded.length}, 失敗: ${failed.length}`);

  // 記錄失敗項目
  if (failed.length > 0) {
    failed.forEach((result, index) => {
      console.error(`記錄 ${index} 失敗:`, result.reason);
    });
  }

  // 策略:部分失敗不拋錯,避免整批重試
  // 將失敗項目送到 DLQ 供後續處理
  if (failed.length > 0) {
    await sendToDLQ(failed.map((r, i) => event.Records[i]));
  }

  return {
    statusCode: 200,
    processed: succeeded.length,
    failed: failed.length,
  };
};

async function processRecord(record) {
  const { eventName, dynamodb } = record;
  const item = unmarshall(dynamodb.NewImage || dynamodb.OldImage);

  // 處理邏輯
  switch (eventName) {
    case "INSERT":
      return await handleInsert(item);
    case "MODIFY":
      return await handleModify(
        unmarshall(dynamodb.OldImage),
        unmarshall(dynamodb.NewImage)
      );
    case "REMOVE":
      return await handleRemove(item, record);
  }
}

Streams 的成本與限制

大師在白板上列出重要限制:

DynamoDB Streams 的限制:

1. 資料保留:24 小時
   - Lambda 必須在 24 小時內處理完
   - 超過 24 小時的事件會被丟棄

2. 順序保證:同一 PK 的事件順序一致
   - 不同 PK 的事件可能亂序
   - 設計時要考慮順序依賴

3. 讀取容量:
   - Streams 的讀取不消耗主表 RCU
   - 但有每秒讀取次數限制

4. Lambda 呼叫成本:
   - 每次觸發都計費
   - 批次大小影響成本

5. 重複事件:
   - 極少數情況下可能重複
   - 必須設計冪等性

洛基問:「什麼時候不該用 Streams?」

大師回答:

不適合 Streams 的場景:

1. 即時性要求極高(< 100ms)
   - Streams 有延遲(通常 < 1 秒,但不保證)

2. 需要嚴格的事務一致性
   - Streams 是最終一致性

3. 事件量極大(> 10,000 TPS)
   - Lambda 並行度有限制
   - 考慮 Kinesis Data Streams

4. 複雜的事件編排
   - 多步驟工作流用 Step Functions

洛基看著白板上密密麻麻的程式碼和架構圖。

從手動操作資料庫,到資料變更自動觸發業務邏輯——這是一個思維上的躍升。

他突然想到一個問題:「如果要做複雜的資料遷移,例如把 100 萬筆舊資料轉換格式,Streams 可以做嗎?」

大師笑了:「好問題。Streams 適合處理增量變更,但不適合大規模歷史資料處理。那種場景需要用 Scan + 批次寫入,或是 DynamoDB 的備份與還原、Export to S3 等工具。」

洛基點點頭。事件驅動架構讓系統更靈活,但也需要更多工具來應對不同場景。

系統設計,永遠是在不同工具中選擇最適合的組合。


時間設定說明:故事中使用星際曆(SY210 = 西元 2210 年),程式碼範例為確保正確執行,使用對應的西元年份。


上一篇
Day 26:TTL 與資料生命週期管理
下一篇
Day 28:生產環境監控與診斷
系列文
DynamoDB銀河傳說首部曲-打造宇宙都打不倒的高效服務30
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言