前兩篇文章中,我們討論了 Workflow 與 AI Agent 的本質差異及優勢(上),以及本專案的架構全景(中)。
本篇要深入程式碼的實作細節了,逐行理解 Temporal 如何在實際專案中與 AI Agent 協作,以及那些讓系統變得可靠、可擴展的工程重點。
這個 Workflow 採用了 Entity Pattern,讓我們從 workflows.ts
的主入口開始:
// /backend/src/temporal/workflows.ts
export async function chatSessionWorkflow(startSessionParams: StartSessionParams): Promise<void> {
// -------------------- 狀態初始化 --------------------
const pendingQueue: QueueItem[] = [];
const idempotency = createIdempotencyManager(startSessionParams.processedRequestIds);
let currentScope: CancellationScope | null = null;
let sessionInitialized = false;
// -------------------- Update Handler: 接收訊息 --------------------
setHandler(sendMessageUpdate, async (params: SendMessageParams): Promise<string> => {
// ...
const completion = new Trigger<string>();
// 新訊息可以繼續進 Queue(不會因為處理中而暫停接收)
pendingQueue.push({ ...params, completion });
// 直到主事件迴圈處理完成
return await completion;
});
// -------------------- Signal Handler: 取消 --------------------
setHandler(cancelSignal, () => {
currentScope?.cancel();
});
// -------------------- 主事件迴圈 : Entity Pattern --------------------
while (true) {
await condition(() => pendingQueue.length > 0 || workflowInfo().continueAsNewSuggested);
while (pendingQueue.length > 0) {
// 一次只處理一條訊息
const item = pendingQueue.shift()!;
try {
currentScope = new CancellationScope();
await currentScope.run(() => processMessage(item));
} catch (err: any) {
await handleError(err, item);
}
}
if (workflowInfo().continueAsNewSuggested) {
const idsToKeep = idempotency.getStateForContinueAsNew();
performContinueAsNew(idsToKeep, startSessionParams.sessionId, startSessionParams.startedAtMs);
}
}
}
利用事件迴圈來做成的 Entity Pattern
為什麼不直接在 Update Handler 裡處理訊息?
// ❌ 錯誤示範:併發處理
setHandler(sendMessageUpdate, async (params) => {
// 問題:多個訊息同時呼叫,會併發執行
const reply = await activities.getAIReply(params.text);
return reply;
// 結果:訊息亂序、競態條件、AI 上下文混亂
});
正確做法:佇列 + 主迴圈保證序列
// ✅ 正確做法:順序處理
setHandler(sendMessageUpdate, async (params) => {
const completion = new Trigger<string>();
// 新訊息可以繼續進 Queue(不會因為處理中而阻塞接收)
pendingQueue.push({ ...params, completion });
// 直到主迴圈處理完成
return await completion;
});
// 主迴圈:一次只處理一條訊息
while (pendingQueue.length > 0) {
const item = pendingQueue.shift()!;
const reply = await processMessage(item); // 序列執行
item.completion.resolve(reply); // 解鎖 Update Handler
}
優點:
Temporal 提供兩種與 Workflow 通訊的方式:
// Workflow 端
const sendMessageUpdate = defineUpdate<string, [SendMessageParams]>('sendMessage');
setHandler(sendMessageUpdate, async (params) => {
// 處理邏輯
return reply; // 回傳結果
});
// 使用者端
const reply = await handle.executeUpdate('sendMessage', {
args: [{ text: 'Hello', requestId: 'req-123' }],
updateId: 'req-123', // 冪等性關鍵
});
// reply = "Hi there!" (同步獲得結果)
特點:
// Workflow 端
const cancelSignal = defineSignal('cancel');
setHandler(cancelSignal, () => {
currentScope?.cancel(); // 取消當前操作
});
// 使用者端
await handle.signal('cancel');
// 立即回傳,不等待處理結果
特點:
本專案的使用:
如何實作「使用者點擊取消,立即停止 AI 呼叫」?
// 主迴圈
while (pendingQueue.length > 0) {
const item = pendingQueue.shift()!;
try {
// 建立可取消的作用域
currentScope = new CancellationScope();
await currentScope.run(() => processMessage(item));
} catch (err: any) {
if (isCancellation(err)) {
// 這是取消操作,不是真的錯誤
item.completion.resolve('已取消');
await activities.saveMessage({
sessionId: item.sessionId,
role: 'system',
content: '已取消',
timestamp: item.startedAtMs,
});
} else {
// 真的錯誤
await handleError(err, item);
}
} finally {
currentScope = null;
}
}
// Signal Handler 觸發取消
setHandler(cancelSignal, () => {
currentScope?.cancel();
});
工作原理:
CancellationScope
cancel
SignalcurrentScope.cancel()
isCancellation(err)
判斷是取消操作,不是錯誤注意:
在分散式系統中,重複請求無處不在:
目標:同一個請求(requestId)只處理一次,回傳相同的結果
// 使用者端:使用 updateId
await handle.executeUpdate('sendMessage', {
args: [{ text: 'Hello', requestId: 'req-123' }],
updateId: 'req-123', // 關鍵!
});
// 如果再次呼叫相同的 updateId
await handle.executeUpdate('sendMessage', {
args: [{ text: 'Hello', requestId: 'req-123' }],
updateId: 'req-123',
});
// Temporal 會直接回傳第一次的結果,不會重新執行
Temporal 如何實作?
updateId
到事件歷史updateId
→ 查找事件歷史 → 回傳之前的結果局限性:
function createIdempotencyManager(initialIds: string[] = []) {
const processedIds = new Set<string>(initialIds);
const resultCache = new Map<string, string>();
return {
getCached(requestId?: string): string | null {
if (!requestId || !processedIds.has(requestId)) return null;
return resultCache.get(requestId) || '該訊息已處理';
},
putCached(requestId: string | undefined, result: string): void {
if (requestId) {
processedIds.add(requestId);
resultCache.set(requestId, result);
}
},
getStateForContinueAsNew() {
const allIds = Array.from(processedIds);
const idsToKeep = allIds.slice(-1000); // 只保留最近 1000 個
return idsToKeep;
}
};
}
使用方式:
// 初始化(接收 ContinueAsNew 傳遞的狀態)
const idempotency = createIdempotencyManager(startSessionParams.processedRequestIds);
// Update Handler 中檢查
setHandler(sendMessageUpdate, async (params) => {
const cached = idempotency.getCached(params.requestId);
if (cached) return cached; // 重放時立即回傳
// ... 處理邏輯 ...
const result = await processMessage(params);
idempotency.putCached(params.requestId, result);
return result;
});
為什麼需要這層?
// backend/src/services/wsRouter.ts
const idempotencyCache = new Map<string, { result: string; timestamp: number }>();
const IDEMPOTENCY_CACHE_TTL = 5 * 60 * 1000; // 5分鐘
function checkIdempotency(requestId?: string): string | null {
if (!requestId) return null;
const cached = idempotencyCache.get(requestId);
if (cached) {
console.log(`[Idempotency] Cache hit for requestId: ${requestId}`);
return cached.result;
}
return null;
}
function cacheIdempotencyResult(requestId: string | undefined, result: string): void {
if (!requestId) return;
idempotencyCache.set(requestId, {
result,
timestamp: Date.now(),
});
}
使用場景:
async function handleUserMessage(data, context) {
// 1. 先檢查快取
const cached = checkIdempotency(data.requestId);
if (cached) {
return context.sendResponse({ message: cached }); // 直接回傳,不呼叫 Workflow
}
// 2. 呼叫 Workflow
const reply = await handle.executeUpdate('sendMessage', {
args: [{ text: data.message, requestId: data.requestId }],
updateId: data.requestId,
});
// 3. 快取結果
cacheIdempotencyResult(data.requestId, reply);
return context.sendResponse({ message: reply });
}
為什麼需要這層?
與 Layer 2 的區別:
// backend/src/temporal/activities.ts
export async function saveLedger(proposal: SaveLedgerInput): Promise<string> {
db.insertLedgerEntry({
userId: proposal.userId,
sessionId: proposal.sessionId ?? null,
title: proposal.title,
amountCents: proposal.amountCents,
occurredAtMs: proposal.occurredAtMs,
createdAtMs: Date.now(),
ledgerId: `ledger-${proposal.requestId}`, // 唯一鍵
});
return '已存入記帳';
}
資料庫 Schema:
CREATE TABLE ledger_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ledger_id TEXT UNIQUE NOT NULL, -- 防止重複插入
user_id TEXT NOT NULL,
session_id TEXT,
title TEXT NOT NULL,
amount_cents INTEGER NOT NULL,
occurred_at_ms INTEGER NOT NULL,
created_at_ms INTEGER NOT NULL
);
工作原理:
ledgerId
:資料庫拒絕(UNIQUE 約束)為什麼需要這層?
層級 | 位置 | 有效期 | 作用 |
---|---|---|---|
Layer 1 | Temporal Server | Workflow 執行期 | Update 層級的精確一次語意 |
Layer 2 | Workflow 記憶體 | ContinueAsNew 間隔 | 避免重放時重複呼叫 Activity |
Layer 3 | Backend 記憶體 | 5分鐘 | 減少 Workflow 呼叫壓力 |
Layer 4 | 資料庫 | 永久 | 最終的資料完整性保證 |
設計哲學:
每一層都假設前一層可能失效,逐層防護,確保系統可靠性
在 Temporal 中,所有不確定性操作都必須封裝為 Activity:
// /backend/src/temporal/activities.ts
export async function decideCapability(text: string): Promise<Capability> {
try {
return await ai.decideCapability(text); // 委派給 AI 模組
} catch (err: any) {
throw new Error(`decideCapability failed: ${err?.message ?? 'unknown error'}`);
}
}
為什麼要封裝?
核心任務:判斷使用者訊息屬於哪種意圖
// /backend/src/utils/ai.ts
export async function decideCapability(userMessage: string): Promise<Capability> {
const schema = z.object({
type: z.enum(['chat', 'weather', 'ledger_proposal', 'ledger_query', 'ledger_undo'])
});
const agent = new Agent({
name: 'Capability Router',
instructions:
'請判斷使用者訊息應該走哪個功能,僅輸出 JSON:{"type":"chat|weather|ledger_proposal|ledger_query|ledger_undo"}。\n' +
'- 一般對話 → chat\n' +
'- 問天氣、氣溫、下雨、晴、°C → weather\n' +
'- 記帳新增/扣除 → ledger_proposal\n' +
'- 查詢當日/昨日/特定日期/當月花費 → ledger_query\n' +
'- 撤銷/刪除最近一筆記帳(例如:「撤銷」、「刪除上一筆」、「取消記帳」、「記錯了」) → ledger_undo',
});
const out = await run(agent, userMessage);
const parsed = schema.parse(
typeof out.finalOutput === 'string' ? JSON.parse(out.finalOutput) : out.finalOutput
);
return parsed.type as Capability;
}
設計亮點:
// AI 輸出(JSON 字串或對象)
const out = await run(agent, "台北天氣如何?");
// out.finalOutput = '{"type":"weather"}' 或 { type: 'weather' }
// 使用 Zod 驗證和解析
const parsed = schema.parse(
typeof out.finalOutput === 'string' ? JSON.parse(out.finalOutput) : out.finalOutput
);
// 如果格式錯誤,Zod 會拋出例外(被 Activity 捕捉並重試)
優點:
instructions:
'請判斷使用者訊息應該走哪個功能,僅輸出 JSON:...\n' +
'- 一般對話 → chat\n' +
'- 問天氣、氣溫、下雨、晴、°C → weather\n' +
// ...
為什麼這樣寫?
export async function decideCapability(text: string): Promise<Capability> {
try {
return await ai.decideCapability(text);
} catch (err: any) {
throw new Error(`decideCapability failed: ${err?.message ?? 'unknown error'}`);
}
}
Activity 的重試配置:
const activities = proxyActivities<ChatActivities>({
startToCloseTimeout: '1 minute',
retry: {
maximumAttempts: 5, // 最多重試 5 次
backoffCoefficient: 2, // 指數退避(1s, 2s, 4s, 8s, 16s)
}
});
如果 AI 呼叫失敗:
export async function weatherReply(userMessage: string): Promise<string> {
const CitySchema = z.object({ city: z.string().min(1) });
// 第一步:提取城市名稱
const extractor = new Agent({
name: 'City Extractor',
instructions: '從使用者訊息中抽取欲查詢天氣的城市,只輸出 JSON:{"city":"Taipei"};若無則 {"city":""}。',
});
const raw = String((await run(extractor, userMessage)).finalOutput || '').trim();
let city = '';
try {
const parsed = CitySchema.safeParse(JSON.parse(raw));
city = parsed.success ? parsed.data.city : '';
} catch {
city = '';
}
if (!city) return '請提供要查詢天氣的城市名稱,例如:台北天氣如何?';
// 第二步:呼叫天氣 API
const res = await fetch(`https://wttr.in/${encodeURIComponent(city)}?format=j1`).then((r) => r.json());
const weather = res?.current_condition?.[0];
if (!weather) throw new Error(`無法取得 ${city} 的天氣資訊`);
const desc = weather.weatherDesc?.[0]?.value ?? '';
const temp = weather.temp_C ?? '?';
return `${city} 現在約 ${temp}°C,天氣狀況:${desc}`;
}
設計思維:
為什麼不讓 AI 直接呼叫 API?
讓我們追蹤這條訊息的完整旅程:
使用者輸入:「昨天中午買了午餐花了120元」
// 1. 使用者點擊發送
sendMessage(); // hooks/useChat.ts
// 2. 產生 requestId
const requestId = `${Date.now()}-${Math.random().toString(36).substring(2, 15)}`;
// → "1697123456789-abc123def"
// 3. 發送 WebSocket 訊息
sendWebSocketMessage(wsRef.current, {
type: 'user_message',
sessionId: 'session-456',
userId: 'user-789',
message: '昨天中午買了午餐花了120元',
requestId: requestId,
});
// 4. 樂觀更新 UI(立即顯示使用者訊息)
setMessages((prev) => [
...prev,
{ role: 'user', content: '昨天中午買了午餐花了120元' },
]);
setWaitingReply(true); // 顯示載入動畫
// services/wsRouter.ts
// 1. 接收 WebSocket 訊息
async function handleUserMessage(data, context) {
console.log('[handleUserMessage]', {
userId: 'user-789',
sessionId: 'session-456',
requestId: '1697123456789-abc123def'
});
// 2. 冪等性檢查(記憶體快取)
const cached = checkIdempotency('1697123456789-abc123def');
if (cached) {
return context.sendResponse({ message: cached }); // 快取命中,直接回傳
}
// 3. 獲取 Workflow Handle
const handle = getWorkflowHandle('session-456');
// → workflowId = "chat-session-session-456"
// 4. 呼叫 Workflow Update
try {
const reply = await handle.executeUpdate('sendMessage', {
args: [{
userId: 'user-789',
sessionId: 'session-456',
text: '昨天中午買了午餐花了120元',
startedAtMs: Date.now(),
requestId: '1697123456789-abc123def',
}],
updateId: '1697123456789-abc123def', // Temporal 冪等性
});
// 5. 快取結果
cacheIdempotencyResult('1697123456789-abc123def', reply);
// 6. 回傳給前端
context.sendResponse({
type: 'assistant_message',
sessionId: 'session-456',
userId: 'user-789',
message: reply, // "已記帳:午餐 -120.00,時間 2024-10-11 12:00"
});
} catch (error) {
context.sendError(`處理訊息失敗:${error.message}`);
}
}
// temporal/workflows.ts
// 1. Update Handler 被呼叫
setHandler(sendMessageUpdate, async (params) => {
console.log('[sendMessageUpdate]', params.requestId);
// 2. 冪等性檢查(Workflow 記憶體)
const cached = idempotency.getCached(params.requestId);
if (cached) {
console.log('[sendMessageUpdate] Cache hit:', cached);
return cached; // 重放時會走到這裡
}
// 3. 加入佇列
const completion = new Trigger<string>();
pendingQueue.push({
userId: params.userId,
sessionId: params.sessionId,
text: params.text,
startedAtMs: params.startedAtMs,
requestId: params.requestId,
completion: completion,
});
console.log('[sendMessageUpdate] Enqueued, queue length:', pendingQueue.length);
// 4. 等待主迴圈處理
const result = await completion;
// 5. 快取結果
idempotency.putCached(params.requestId, result);
return result;
});
// 主迴圈檢測到佇列有內容
while (true) {
await condition(() => pendingQueue.length > 0);
// 1. 取出佇列頭部
const item = pendingQueue.shift()!;
console.log('[mainLoop] Processing:', item.text);
try {
currentScope = new CancellationScope();
await currentScope.run(() => processMessage(item));
} catch (err) {
await handleError(err, item);
}
}
// processMessage 函式
async function processMessage(item: QueueItem): Promise<void> {
const timestamp = item.startedAtMs;
// 1. 初始化 session(僅首次)
if (!sessionInitialized) {
console.log('[processMessage] Initializing session');
await activities.initializeSession({
sessionId: item.sessionId,
title: item.text, // 用第一條訊息作為標題
timestamp,
});
sessionInitialized = true;
}
// 2. 儲存使用者訊息
console.log('[processMessage] Saving user message');
await activities.saveMessage({
sessionId: item.sessionId,
role: 'user',
content: item.text,
timestamp,
messageId: `user-${item.requestId}`, // 冪等性關鍵
});
// 3. 判斷能力
console.log('[processMessage] Deciding capability');
const capability = await activities.decideCapability(item.text);
console.log('[processMessage] Capability:', capability);
// → 'ledger_proposal'
// 4. 產生回覆
console.log('[processMessage] Generating reply');
const reply = await generateReply(capability, item, timestamp);
console.log('[processMessage] Reply:', reply);
// → "已記帳:午餐 -120.00,時間 2024-10-11 12:00"
// 5. 儲存 AI 回覆
await activities.saveMessage({
sessionId: item.sessionId,
role: 'assistant',
content: reply,
timestamp,
messageId: `assistant-${item.requestId}`,
});
// 6. 解鎖 Update Handler
item.completion.resolve(reply);
}
// generateReply 函式(針對 ledger_proposal)
async function generateReply(capability: 'ledger_proposal', item, timestamp) {
console.log('[generateReply] ledger_proposal');
// 1. 解析記帳提議(AI 呼叫)
const ledger = await activities.parseLedgerProposal({
userId: item.userId,
sessionId: item.sessionId,
text: item.text, // "昨天中午買了午餐花了120元"
startedAtMs: timestamp,
requestId: item.requestId,
});
console.log('[generateReply] Parsed ledger:', ledger);
// → {
// title: '午餐',
// amountCents: -12000,
// occurredAtMs: 1697011200000, // 昨天中午 12:00
// explain: '午餐 -120.00,時間 2024-10-11 12:00'
// }
// 2. 儲存記帳(DB 操作)
await activities.saveLedger({
userId: ledger.userId,
sessionId: ledger.sessionId,
title: ledger.title,
amountCents: ledger.amountCents,
occurredAtMs: ledger.occurredAtMs,
requestId: item.requestId,
});
console.log('[generateReply] Ledger saved');
// 3. 回傳確認訊息
return `已記帳:${ledger.explain}`;
}
// temporal/activities.ts → utils/ai.ts
export async function parseLedgerProposal(params) {
const now = new Date(); // 當前時間
const agent = new Agent({
name: 'Ledger Parser',
instructions: `
解析記帳資訊...
當前時間:${now.toISOString()}
昨天中午12點:${new Date(now.getTime() - 24*60*60*1000 + 12*60*60*1000).getTime()}
`,
});
// AI 呼叫
const result = await run(agent, '昨天中午買了午餐花了120元');
// AI 輸出:
// {
// kind: 'sub',
// item: '午餐',
// amount: 120,
// occurredAtMs: 1697011200000
// }
const parsed = JSON.parse(result.finalOutput);
// 驗證和建構
const built = ledger.buildLedgerProposalFields({
parsed,
userId: params.userId,
sessionId: params.sessionId,
now,
});
return {
userId: built.userId,
sessionId: built.sessionId,
title: built.title, // '午餐'
amountCents: built.amountCents, // -12000
occurredAtMs: built.occurredAtMs, // 1697011200000
explain: built.explain, // '午餐 -120.00,時間 2024-10-11 12:00'
};
}
// temporal/activities.ts → utils/db.ts
export async function saveLedger(proposal) {
db.insertLedgerEntry({
userId: proposal.userId, // 'user-789'
sessionId: proposal.sessionId, // 'session-456'
title: proposal.title, // '午餐'
amountCents: proposal.amountCents, // -12000
occurredAtMs: proposal.occurredAtMs, // 1697011200000
createdAtMs: Date.now(),
ledgerId: `ledger-${proposal.requestId}`, // 'ledger-1697123456789-abc123def'
});
// SQL:
// INSERT INTO ledger_entries (ledger_id, user_id, session_id, title, amount_cents, occurred_at_ms, created_at_ms)
// VALUES ('ledger-1697123456789-abc123def', 'user-789', 'session-456', '午餐', -12000, 1697011200000, 1697123456789)
return '已存入記帳';
}
// hooks/useChat.ts
// WebSocket 收到訊息
ws.addEventListener('message', (evt) => {
const data = JSON.parse(evt.data);
if (data.type === 'assistant_message') {
console.log('[useChat] Received reply:', data.message);
// 1. 停止載入動畫
setWaitingReply(false);
// 2. 顯示 AI 回覆
setMessages((prev) => [
...prev,
{
role: 'assistant',
content: data.message // "已記帳:午餐 -120.00,時間 2024-10-11 12:00"
},
]);
}
});
T+0ms 前端:使用者點擊發送,產生 requestId
T+5ms 前端:發送 WebSocket 訊息,樂觀更新 UI
T+10ms 後端:接收訊息,冪等性檢查(未命中)
T+15ms 後端:呼叫 Workflow.executeUpdate
T+20ms Workflow:Update Handler 加入佇列
T+25ms Workflow:主迴圈取出佇列
T+30ms Activity:初始化 session(首次)
T+50ms Activity:儲存使用者訊息到 DB
T+100ms Activity:呼叫 AI 判斷能力(OpenAI API)
T+150ms Workflow:收到 capability = 'ledger_proposal'
T+200ms Activity:呼叫 AI 解析記帳(OpenAI API)
T+250ms Workflow:收到解析結果
T+300ms Activity:儲存記帳到 DB
T+350ms Activity:儲存 AI 回覆到 DB
T+400ms Workflow:解鎖 Update Handler,回傳結果
T+450ms 後端:收到 Workflow 回傳,快取結果
T+500ms 後端:發送 WebSocket 訊息給前端
T+550ms 前端:收到回覆,更新 UI
總耗時:約 550ms(其中 AI 呼叫佔 200ms)
const activities = proxyActivities<ChatActivities>({
startToCloseTimeout: '1 minute',
retry: {
maximumAttempts: 5,
backoffCoefficient: 2,
initialInterval: '1s',
maximumInterval: '60s',
}
});
重試時間線:
哪些錯誤會重試?
如何讓業務錯誤不重試?
// Activity 中拋出 ApplicationFailure
import { ApplicationFailure } from '@temporalio/common';
export async function parseLedgerProposal(params) {
// ...
if (!parsed || (parsed.kind !== 'add' && parsed.kind !== 'sub')) {
throw ApplicationFailure.nonRetryable('Not a ledger proposal');
}
// ...
}
訪問 http://localhost:8233
(或 Temporal Cloud UI):
查看 Workflow 列表:
查看單個 Workflow:
重放 Workflow:
Temporal 和 AI Agents 的結合,代表了一種新的應用程式開發典範:
用程式碼表達業務邏輯,用 AI 增強智慧,用 Workflow 保證可靠性
這不僅僅是技術上的創新,更是思維方式的轉變:
希望這三篇文章能幫助你理解這個強大的架構模式,並在自己的專案中應用。