iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
Build on AWS

從一個網站的誕生,看懂 AWS 架構與自動化的全流程!系列 第 27

Day 27 資料庫遷移:DynamoDB Streams 串接 S3 x Athena 匯出與查詢

  • 分享至 

  • xImage
  •  

https://ithelp.ithome.com.tw/upload/images/20251011/20172743fcmfFgHlWK.png

一、前言

在會員系統或應用服務中,DynamoDB 通常扮演交易與即時讀寫的角色,但若需要進行歷史數據查詢、商業分析或批次報表,DynamoDB 並不適合直接使用。透過 DynamoDB Streams,我們可以即時捕捉資料變更,將其匯出到 S3 Data Lake,再利用 Athena 查詢,讓即時資料與分析需求兩者兼顧。

這個 Lab 解決了 DynamoDB 查詢受限的痛點。DynamoDB 適合 OLTP(即時交易處理),但在分析場景(OLAP)中,Scan 成本高且效能差。透過 Streams → Lambda → S3,資料會自動備份並轉換為查詢友好的格式(例如 Parquet/JSON/CSV),再透過 Athena SQL 查詢,建立一個低成本的分析環境,補足 DynamoDB 的不足。

二、需要使用到的服務

  • Amazon DynamoDB Streams:捕捉資料表新增、修改、刪除的事件。
  • AWS Lambda:將 Streams 資料轉換並寫入 S3。
  • Amazon S3:存放遷移後的原始資料與轉換格式。
  • Amazon Athena:以 SQL 方式直接查詢 S3 中的資料。
  • AWS Glue Data Catalog(選用):管理 Schema,讓 Athena 能自動辨識欄位。

三、架構/概念圖

https://ithelp.ithome.com.tw/upload/images/20251010/20172743uUcPDDxahK.png

四、技術重點

  1. 使用 Parquet 格式 儲存資料,提高 Athena 查詢效率並降低成本。
  2. 配合 S3 Lifecycle Policy,自動將舊檔案轉移至 Glacier,降低儲存成本。(本篇未使用)
  3. 搭配 Glue Data Catalog 自動化 Schema 管理,避免人工維護。
  4. 建立 分區設計(例如依照日期),提升 Athena 查詢效率。
  • DynamoDB的Stream功能:
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743OmiwIOsult.png

五、Lab流程

1️⃣ 前置作業

1. 建立 S3 Bucket。

2️⃣ 主要配置

1. 在已創建的DynamoDB,啟用 DynamoDB Streams(選擇 NEW_AND_OLD_IMAGES)。

  1. 進入「DynamoDB」頁面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743UCeqRpLXER.png

  2. 進入最初創建的資料表「userId」內。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743TCm5enjoUf.png

  3. 進入 匯出和串流 (Exports and streams) 頁籤,並在「DynamoDB串流」選擇「編輯」。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743wLVRXzAyPB.png

  4. 串流檢視類型 (Stream view type):選擇 NEW_AND_OLD_IMAGES (包含變更前後的完整資料),並點選「開啟串流」。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743CMTQSLJZkl.png

  5. 完成畫面。
    https://ithelp.ithome.com.tw/upload/images/20251010/201727437YimdhPBMn.png

2. 建立Lambda函數

  1. 進入「Lambda」頁面。
    https://ithelp.ithome.com.tw/upload/images/20251010/2017274394DOeqZ2xo.png

  2. 創建一個新的函數。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743M6Q0pyugGl.png

  3. 輸入函數名稱,並選擇編撰語言。
    https://ithelp.ithome.com.tw/upload/images/20251010/2017274373YDBgmsg8.png

  4. 跳過建議畫面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743JRnihhQfRa.png

  5. 寫入程式碼,並部署。

    • 範例程式碼

      // index.mjs
      import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
      import zlib from "zlib";
      
      const s3 = new S3Client({ region: process.env.AWS_REGION });
      const BUCKET_NAME = process.env.EXPORT_BUCKET;
      
      function unmarshallDDB(ddbRecord) {
        const json = {};
        for (const key in ddbRecord) {
          const val = ddbRecord[key];
          if (val.S !== undefined) json[key] = val.S;
          else if (val.N !== undefined) json[key] = Number(val.N);
          else if (val.BOOL !== undefined) json[key] = val.BOOL;
          else if (val.NULL !== undefined) json[key] = null;
          else if (val.M !== undefined) json[key] = unmarshallDDB(val.M);
          else if (val.L !== undefined) json[key] = val.L.map(unmarshallDDB);
        }
        return json;
      }
      
      export const handler = async (event) => {
        const now = new Date();
        const timestamp = now.toISOString().replace(/[:.]/g, "-");
        const records = [];
      
        for (const record of event.Records) {
          if (["INSERT", "MODIFY", "REMOVE"].includes(record.eventName)) {
            const dataImage = record.dynamodb.NewImage || record.dynamodb.OldImage || {};
            const parsed = unmarshallDDB(dataImage);
      
            records.push({
              event_name: record.eventName,
              timestamp: record.dynamodb.ApproximateCreationDateTime,
              user_id: parsed.userId,
              email: parsed.email,
            });
          }
        }
      
        if (records.length === 0) {
          console.log("No records to process.");
          return { status: "no_data" };
        }
      
        // ✅ 每行一筆 JSON,而非整個 array
        const ndjson = records.map((r) => JSON.stringify(r)).join("\n");
      
        const compressed = zlib.gzipSync(ndjson);
      
        const partitionPath = `year=${now.getUTCFullYear()}/month=${String(
          now.getUTCMonth() + 1
        ).padStart(2, "0")}/`;
        const fileName = `exports/${partitionPath}${timestamp}.json.gz`;
      
        await s3.send(
          new PutObjectCommand({
            Bucket: BUCKET_NAME,
            Key: fileName,
            Body: compressed,
          })
        );
      
        console.log(`✅ Uploaded NDJSON to s3://${BUCKET_NAME}/${fileName}`);
      
        return { status: "success", file: fileName };
      };
      
      

    https://ithelp.ithome.com.tw/upload/images/20251010/20172743U7DJWIskNJ.png

  6. 進入「組態」分頁,設定環境變數,EXPORT_BUCKET:bucket name。
    https://ithelp.ithome.com.tw/upload/images/20251010/201727431Ky7phCVJe.png

3. 調整Lambda的IAM Role權限

  1. 進入「IAM 」頁面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743xbH8d0kbbX.png

  2. 進入IAM role的頁面,點選該Lambda自動創建的IAM role。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743VXpmpBLsp5.png

  3. 新增「許可政策」。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743Wxc9MeUWiV.png

  4. 增加以下權限,授權資源為「所有資源」。
    logs:CreateLogStream
    logs:CreateLogGroup
    logs:PutLogEvents
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743QfHl2xyqrm.png

  5. 增加以下權限,授權資源為「指定資源(DynamoDB、S3 bucket)」。
    s3:PutObject
    s3:PutObjectAcl
    dynamodb:GetShardIterator
    dynamodb:DescribeStream
    dynamodb:GetRecords
    dynamodb:ListStreams
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743JF86lGzJiT.png

  6. 授權資源為「指定資源(DynamoDB」。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743Py3EMTPvrj.png

    • DynamoDB Stream的ARN在哪裡?
      https://ithelp.ithome.com.tw/upload/images/20251010/20172743taHs7ijzRO.png
  7. 設定「許可政策」名稱。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743AjzAbMA0eC.png

  8. 完成畫面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743UNOPLTDCka.png

5. 設定Streams觸發

  1. 進入「DynamoDB」後,進入「匯出與串流」頁面,新增一個「觸發器」。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743HCl9X4GT9e.png

  2. 串接觸發器為前面建立的Lambda函數。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743DsSiY8wzKx.png

  3. 串接完成畫面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743oirYvjOdYZ.png

6. 在Athena建立分析環境

  1. 進入「Amazon Athena」頁面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743tNvDd1YLZ2.png

  2. 進入「設定」頁面。
    https://ithelp.ithome.com.tw/upload/images/20251010/2017274383r844Urd5.png

  3. 貼上要查詢的S3檔案路徑。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743Vvn0DOS1OU.png

  4. 用SQL語法建立資料庫。

    • 程式碼範例

      CREATE DATABASE IF NOT EXISTS analytics_db;
      

    https://ithelp.ithome.com.tw/upload/images/20251010/20172743Nt5H4s7dbe.png

  5. 完成畫面。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743TEgliT9I9V.png

  6. 建立外部資料表 (External Table):SQL

    • 執行以下 SQL 語法,定義資料表結構並指向您的 S3 路徑。
    • 程式碼範例
      CREATE EXTERNAL TABLE IF NOT EXISTS analytics_db.dynamodb_users (
        `event_name` string,
        `timestamp` bigint,
        `userId` string, 
        `email` string
      )
      PARTITIONED BY (
        `year` string,
        `month` string
      )
      ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
      WITH SERDEPROPERTIES (
        'serialization.null.format' = 'null'
      )
      LOCATION 's3://<your_bucket_name>/<path>/'
      TBLPROPERTIES (
        'compressionType' = 'gzip',
        'classification' = 'json'
      );
      

    https://ithelp.ithome.com.tw/upload/images/20251010/201727438pJHk2VaWw.png

  7. 完成創建。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743vpGlVwtuih.png

3️⃣ 測試驗證

1. 插入測試資料:

💡在 DynamoDB Console 中,手動對 Users 表格插入、修改或刪除一筆資料。

  1. 進入DynamoDB的「探索項目」頁面,透過之前的頁面複製一筆新的資料。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743A0gKXr8Sd8.png

  2. 輸入要新增的測user資料。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743j3VJLmnO99.png

2. 驗證匯出:

  1. 幾秒鐘後,檢查 S3 儲存桶 dynamodb-export-data-2025-lab,確認 exports/year=YYYY/month=MM/ 路徑下是否有 .json.gz 檔案出現。
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743FXW7u82aWw.png
    https://ithelp.ithome.com.tw/upload/images/20251010/20172743BHIAyC5hTA.png

3. Athena 查詢:

  1. 在 Athena 中,執行以下查詢來載入分區:SQL

    MSCK REPAIR TABLE analytics_db.dynamodb_users;
    

    https://ithelp.ithome.com.tw/upload/images/20251010/20172743msuINzWpzL.png

  2. 執行查詢:SQL,驗證查詢結果中是否包含您剛才在 DynamoDB 中新增或修改的數據。

    SELECT event_name, user_id, email, timestamp
    FROM analytics_db.dynamodb_users
    WHERE year='2025' AND month='10'
    LIMIT 10;
    

    https://ithelp.ithome.com.tw/upload/images/20251010/201727434OOkcMbLX4.png

【補充】刪除資料表的方式:

  1. 刪除資料表:

    DROP TABLE analytics_db.dynamodb_users;
    
  2. 刪除資料庫:

    DROP DATABASE IF EXISTS analytics_db;
    

六、結語

今天的 Lab 展示了如何透過 DynamoDB Streams → Lambda → S3 → Athena,將 DynamoDB 中的交易資料遷移到 Data Lake,並利用 SQL 進行分析。這樣的架構解決了 DynamoDB 在分析上的限制,同時維持即時資料的一致性,讓應用不僅能支撐高流量,也能提供完整的數據洞察。


上一篇
Day 26 服務健康監測:CloudWatch x Alarms 掌握關鍵指標
下一篇
Day 28 商業智慧儀表板:QuickSight x Athena 報表與視覺化
系列文
從一個網站的誕生,看懂 AWS 架構與自動化的全流程!28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言