洛基興奮地走進茶室,手中拿著一張滿滿的計算紙:「大師!我想到一個完美的測試場景!」
他攤開紙張:「小行星帶 5020 號前哨站昨天通訊中斷了三天,現在需要補傳所有的觀測資料。我算了一下,總共 30 萬筆紀錄!」
諾斯克大師放下茶杯:「聽起來是個不錯的挑戰。你打算怎麼處理?」
「簡單!」洛基自信滿滿,「用 PutItem 寫個迴圈就解決了。」
大師微笑不語,只是說:「那就試試看吧。」
洛基快速寫下程式碼:
async function uploadAsteroidData(dataList) {
console.log(`開始上傳 ${dataList.length} 筆觀測資料...`);
const startTime = Date.now();
for (let i = 0; i < dataList.length; i++) {
await docClient
.put({
TableName: "AsteroidObservations",
Item: dataList[i],
})
.promise();
if (i % 1000 === 0) {
console.log(`已處理 ${i} 筆...`);
}
}
const endTime = Date.now();
console.log(`完成!總耗時:${(endTime - startTime) / 1000} 秒`);
}
洛基說:「讓我用 1000 筆來測試一下。」
他執行程式,螢幕開始滾動:
開始上傳 1000 筆觀測資料...
已處理 0 筆...
已處理 1000 筆...
完成!總耗時:12.3 秒
洛基皺起眉頭:「12 秒處理 1000 筆?」他拿起計算機:「這樣的話,30 萬筆需要...」
Hippo 回答:「12.3 × 300 = 3690 秒 = 61.5 分鐘!」
洛基驚呼,「這樣也太慢了。」
大師問:「你覺得為什麼會這麼慢?」
洛基思考了一下:「每次 PutItem 都要發送一個 HTTP 請求到 DynamoDB...」
「對,網路延遲。」
「而且每次都要等回應才能送下一筆...」
「序列處理。」
洛基恍然大悟:「所以大部分時間都浪費在等待網路回應上!」
大師在白板上畫了一個時間軸:
單筆操作的時間分解:
|--請求準備--|-網路傳輸-|--DDB處理--|-網路傳輸-|--回應處理--|
1ms 8ms 1ms 8ms 1ms
總計:19ms(其中 16ms 都在等網路)
「問題在於,」大師指著圖,「真正的處理時間只有 2ms,但網路延遲占了 85% 的時間!」
洛基突然想到:「就像搬箱子一樣?如果我一次只搬一個,大部分時間都在爬樓梯!」
「完全正確!」大師讚許,「那你覺得該怎麼優化?」
「一次搬多個箱子!」洛基眼睛一亮,「但是 DynamoDB 有辦法一次處理多筆資料嗎?」
「有的,」大師說,「這就是批次操作的由來。」
大師翻開參考文件:「DynamoDB 提供了 BatchWriteItem 操作,可以一次處理多筆寫入。」
洛基迫不及待:「太好了!那我可以一次送 1000 筆嗎?」
「等等,」大師笑道,「看看 Hippo 怎麼說。」
Hippo 回答:
BatchWriteItem 限制:
- 最多 25 個項目
- 所有項目總大小不超過 16MB
- 單一項目不超過 400KB
洛基失望地說:「只有 25 個?那不是幫助不大?」
大師引導他思考:「讓我們算算看。如果用批次,1000 筆資料需要幾次請求?」
「1000 ÷ 25 = 40 次請求。」
「對比原本的 1000 次請求如何?」
洛基眼睛亮了:「減少了 96%!如果網路延遲從 1000 × 18ms 變成 40 × 20ms...」
Hippo 搶著回答:「18 秒變成 0.8 秒!快了 22 倍!」
洛基於是開始改寫程式:
async function uploadAsteroidDataBatch(dataList) {
console.log(`開始批次上傳 ${dataList.length} 筆觀測資料...`);
const startTime = Date.now();
// 分批,每批 25 筆
for (let i = 0; i < dataList.length; i += 25) {
const batch = dataList.slice(i, i + 25);
const params = {
RequestItems: {
AsteroidObservations: batch.map((item) => ({
PutRequest: {
Item: item,
},
})),
},
};
await docClient.batchWrite(params).promise();
console.log(`已處理 ${Math.min(i + 25, dataList.length)} 筆...`);
}
const endTime = Date.now();
console.log(`完成!總耗時:${(endTime - startTime) / 1000} 秒`);
}
「看起來不錯,」洛基說,「讓我測試一下。」
執行結果:
開始批次上傳 1000 筆觀測資料...
已處理 25 筆...
已處理 50 筆...
已處理 75 筆...
...
已處理 1000 筆...
完成!總耗時:1.2 秒
洛基開心地說:「1.2 秒!比原來快了 10 倍!」
但他突然想到一個問題:「大師,如果某一批失敗了怎麼辦?」
大師神秘地笑了:「好問題。讓我們故意製造一個失敗的情況。」
大師請 Hippo 調整表格的寫入容量到很低的值:「現在再試一次,看會發生什麼。」
洛基執行程式,這次螢幕顯示:
開始批次上傳 1000 筆觀測資料...
已處理 25 筆...
已處理 50 筆...
已處理 75 筆...
已處理 100 筆...
完成!總耗時:1.8 秒
「看起來還是成功了?」洛基困惑地說。
大師搖頭:「讓我們檢查一下實際寫入的數量。」
洛基查詢表格,發現只有 847 筆資料!
「少了 153 筆!」洛基驚呼,「但程式沒有報錯啊!」
「問題就在這裡,」大師說,「BatchWriteItem 不會因為部分失敗而拋出錯誤。它會告訴你哪些項目沒有處理成功。」
他指著文件:「看這裡,回應中有個 UnprocessedItems 欄位。」
洛基仔細閱讀:「所以當 DynamoDB 無法處理某些項目時,會把它們放在 UnprocessedItems 中回傳?」
「正是如此,」大師接著說,「這不是錯誤,而是 DynamoDB 在說『這些項目我現在處理不了,請稍後重試』。」
洛基好奇地問:「為什麼會有無法處理的項目?」
大師拿出四個不同顏色的瓶子:
「紅瓶代表容量不足,」他舉起第一個瓶子,「當表格的寫入容量單位(WCU)被用完時,新的寫入請求就會被延後。」
「黃瓶代表保護機制,」他舉起第二個,「DynamoDB 會主動限制過於頻繁的請求,避免系統過載。」
「藍瓶代表項目太大,」第三個瓶子,「如果某個項目超過 400KB,就無法處理。」
「白瓶代表暫時性問題,」最後一個瓶子,「網路不穩或其他暫時性錯誤。」
洛基恍然大悟:「所以 UnprocessedItems 大多數情況下都是暫時性的?只要重試就可以了?」
「沒錯!」大師說,「這就是為什麼批次操作的完整實作需要重試機制。」
洛基迫切地問:「那我該怎麼處理這些 UnprocessedItems?」
大師引導他:「你覺得最簡單的方法是什麼?」
「檢查回應,如果有 UnprocessedItems 就再送一次?」
「對,但要注意什麼?」
洛基思考:「如果一直失敗,不能無限重試...」
「還有?」
「重試的間隔時間?如果立刻重試,可能還是會失敗。」
「很好!」大師讚許,「讓我們一步步實作:」
async function uploadAsteroidDataWithRetry(dataList) {
console.log(`開始批次上傳 ${dataList.length} 筆觀測資料...`);
const startTime = Date.now();
// 分批,每批 25 筆
for (let i = 0; i < dataList.length; i += 25) {
const batch = dataList.slice(i, i + 25);
let unprocessedItems = batch;
let retryCount = 0;
const maxRetries = 3;
while (unprocessedItems.length > 0 && retryCount < maxRetries) {
const params = {
RequestItems: {
AsteroidObservations: unprocessedItems.map((item) => ({
PutRequest: {
Item: item,
},
})),
},
};
const result = await docClient.batchWrite(params).promise();
// 檢查是否有未處理的項目
if (
result.UnprocessedItems &&
result.UnprocessedItems.AsteroidObservations
) {
unprocessedItems = result.UnprocessedItems.AsteroidObservations.map(
(req) => req.PutRequest.Item
);
retryCount++;
if (unprocessedItems.length > 0) {
console.log(
`批次 ${Math.floor(i / 25) + 1} 有 ${
unprocessedItems.length
} 項目需要重試...`
);
// 等待一下再重試
await new Promise((resolve) => setTimeout(resolve, 100 * retryCount));
}
} else {
unprocessedItems = []; // 全部成功
}
}
if (unprocessedItems.length > 0) {
console.error(
`批次 ${Math.floor(i / 25) + 1} 最終失敗項目:${
unprocessedItems.length
} 筆`
);
}
console.log(`已處理批次 ${Math.floor(i / 25) + 1}...`);
}
const endTime = Date.now();
console.log(`完成!總耗時:${(endTime - startTime) / 1000} 秒`);
}
洛基執行新版本的程式,這次螢幕顯示:
開始批次上傳 1000 筆觀測資料...
已處理批次 1...
已處理批次 2...
批次 3 有 12 項目需要重試...
已處理批次 3...
批次 4 有 8 項目需要重試...
已處理批次 4...
...
完成!總耗時:3.2 秒
他檢查表格:「1000 筆全部都在!」
但洛基注意到時間增加到 3.2 秒:「重試讓時間變長了。」
大師解釋:「這是預期的。重試是為了確保資料完整性,稍微犧牲速度是值得的。」
「而且,」大師補充,「如果表格容量充足,就不會有這麼多重試。」
洛基問:「我注意到每次重試都等更久,為什麼?」
大師引導他思考:「如果所有失敗的請求都立刻重試,會發生什麼?」
「可能會讓情況更糟?」洛基猜測,「因為系統正在恢復,又來一波請求...」
「正是!」大師說,「這叫做羊群效應。想像一群羊同時衝向一個門口。」
他在白板上畫出指數退避的概念:
指數退避策略:
第1次失敗 → 等 100ms → 重試
第2次失敗 → 等 200ms → 重試
第3次失敗 → 等 400ms → 重試
第4次失敗 → 等 800ms → 重試
「每次等待時間加倍,」洛基理解了,「讓系統有時間恢復。」
「而且加入隨機性更好,」大師說,「避免多個客戶端同時重試。」
洛基改進了重試機制,加入更智慧的指數退避:
async function uploadAsteroidDataSmart(dataList) {
console.log(`開始智慧批次上傳 ${dataList.length} 筆觀測資料...`);
const startTime = Date.now();
for (let i = 0; i < dataList.length; i += 25) {
const batch = dataList.slice(i, i + 25);
let unprocessedItems = batch;
let retryCount = 0;
const maxRetries = 5;
while (unprocessedItems.length > 0 && retryCount < maxRetries) {
const params = {
RequestItems: {
AsteroidObservations: unprocessedItems.map((item) => ({
PutRequest: { Item: item },
})),
},
};
const result = await docClient.batchWrite(params).promise();
if (
result.UnprocessedItems &&
result.UnprocessedItems.AsteroidObservations
) {
unprocessedItems = result.UnprocessedItems.AsteroidObservations.map(
(req) => req.PutRequest.Item
);
retryCount++;
if (unprocessedItems.length > 0) {
// 指數退避 + 隨機抖動
const baseDelay = 100 * Math.pow(2, retryCount);
const jitter = Math.random() * baseDelay * 0.1;
const delay = Math.min(baseDelay + jitter, 5000);
console.log(
`批次 ${
Math.floor(i / 25) + 1
} 重試 ${retryCount},等待 ${Math.round(delay)}ms...`
);
await new Promise((resolve) => setTimeout(resolve, delay));
}
} else {
unprocessedItems = [];
}
}
if (unprocessedItems.length > 0) {
console.error(
`批次 ${Math.floor(i / 25) + 1} 最終失敗項目:${
unprocessedItems.length
} 筆`
);
}
}
const endTime = Date.now();
console.log(`完成!總耗時:${(endTime - startTime) / 1000} 秒`);
}
「加入隨機抖動真的有幫助嗎?」洛基問。
大師點頭:「避免多個客戶端同步重試。想像如果 1000 個客戶端都在相同時間重試...」
「又是羊群效應!」洛基領悟。
洛基好奇地問:「除了 BatchWriteItem,還有其他批次操作嗎?」
「當然,」大師說,「還有 BatchGetItem 用於批次讀取。」
// 批次讀取示例
async function batchGetObservations(observationIds) {
const params = {
RequestItems: {
AsteroidObservations: {
Keys: observationIds.map((id) => ({
PK: `OBSERVATION#${id}`,
SK: `DATA`,
})),
},
},
};
const result = await docClient.batchGet(params).promise();
return result.Responses.AsteroidObservations;
}
洛基立刻注意到差異:「BatchGetItem 可以處理更多項目?」
「對,」大師說,「讀取的限制比較寬鬆:」
BatchGetItem 限制:
- 最多 100 個鍵值
- 總大小不超過 16MB
為什麼比寫入寬鬆?
- 讀取不需要更新索引
- 沒有一致性檢查的複雜性
- 對系統負載較小
當洛基準備處理完整的 30 萬筆資料時,大師分享了一個重要概念:
「大量資料處理需要系統性思考,」大師說,「不只是技術問題,更是架構問題。」
洛基詢問:「您的意思是?」
「考慮這些問題:」大師引導,「如果處理到一半失敗了怎麼辦?如何知道哪些資料已經處理過?如何監控進度?」
洛基思考:「需要記錄處理狀態?」
「正是!」大師讚許,「這叫做可恢復性設計。」
// 可恢復的批次處理
async function recoverableBatchUpload(dataList, batchSize = 25) {
const totalBatches = Math.ceil(dataList.length / batchSize);
let processedBatches = 0;
// 讀取之前的進度(如果存在)
const checkpoint = await loadCheckpoint();
processedBatches = checkpoint?.processedBatches || 0;
console.log(`從批次 ${processedBatches + 1} 開始處理...`);
for (
let i = processedBatches * batchSize;
i < dataList.length;
i += batchSize
) {
const batch = dataList.slice(i, i + batchSize);
const batchNumber = Math.floor(i / batchSize) + 1;
try {
await processBatchWithRetry(batch);
processedBatches++;
// 儲存進度
await saveCheckpoint({ processedBatches, totalBatches });
console.log(`完成批次 ${batchNumber}/${totalBatches}`);
} catch (error) {
console.error(`批次 ${batchNumber} 處理失敗:`, error);
break;
}
}
console.log(`處理完成:${processedBatches}/${totalBatches} 批次`);
}
洛基問:「處理 30 萬筆資料前,我該怎麼規劃容量?」
大師帶他進行實際計算:
「首先,估算所需的處理速率,」大師說:
容量規劃步驟:
1. 確定目標:
- 資料量:300,000 筆
- 目標時間:5 分鐘(300 秒)
- 每筆大小:約 1KB
2. 計算所需速率:
- 每秒需處理:300,000 ÷ 300 = 1,000 筆
- 所需 WCU:1,000(每筆 1KB = 1 WCU)
3. 加入安全邊際:
- 考慮重試:1,000 × 1.3 = 1,300 WCU
- 考慮尖峰:1,300 × 1.5 = 1,950 WCU
- 設定目標:2,000 WCU
「為什麼要這麼多安全邊際?」洛基問。
「因為現實總是比理論複雜,」大師解釋,「重試會增加負載,系統可能有其他流量,而且容量不足的代價比容量過剩高得多。」
洛基在實際執行中遇到問題:「有些批次一直失敗,我該怎麼查找原因?」
大師教他加入詳細的監控:
// 加入監控的批次處理
async function monitoredBatchUpload(dataList) {
const stats = {
totalBatches: 0,
successfulBatches: 0,
failedBatches: 0,
totalRetries: 0,
avgProcessingTime: 0,
};
for (let i = 0; i < dataList.length; i += 25) {
const batchStartTime = Date.now();
const batch = dataList.slice(i, i + 25);
stats.totalBatches++;
try {
const retryCount = await processBatchWithRetry(batch);
stats.successfulBatches++;
stats.totalRetries += retryCount;
const batchTime = Date.now() - batchStartTime;
stats.avgProcessingTime =
(stats.avgProcessingTime * (stats.successfulBatches - 1) + batchTime) /
stats.successfulBatches;
} catch (error) {
stats.failedBatches++;
console.error(`批次 ${Math.floor(i / 25) + 1} 失敗:`, error);
}
// 每 100 個批次報告一次統計
if (stats.totalBatches % 100 === 0) {
console.log(`統計報告:`, stats);
}
}
return stats;
}
經過適當調整後,洛基最後執行的最終版本,螢幕顯示:
開始處理小行星帶 5020 站的 300000 筆觀測資料...
總共 12000 個批次,從批次 1 開始...
進度:0.8% (100/12000),已耗時:8.3秒
進度:1.7% (200/12000),已耗時:16.1秒
...
進度:99.2% (11900/12000),已耗時:312.4秒
進度:100.0% (12000/12000),已耗時:318.7秒
=== 處理完成 ===
總耗時:318.7 秒
成功:299847 筆
失敗:153 筆
總重試:2847 次
平均速度:941 筆/秒
「5 分鐘多一點就完成了!」洛基興奮地說,「比預計的還要快!」
大師笑道:「整體效率提升了 12 倍!這就是批次操作的威力。不過今天你體驗了成功的批次處理,但現實世界總會給你意外。」
「什麼意外?」洛基好奇。
「錯誤,」大師說,「各種各樣的錯誤。ValidationException、ConditionalCheckFailedException、NetworkTimeout...」
洛基想起今天遇到的一些問題:「確實,即使有重試機制,還是有一些失敗的項目。」
「正是,」大師說,「明天我們要學習錯誤處理與除錯。不是為了避免錯誤,而是為了與錯誤共舞。」