前一篇的流程搭配 Temporal 的 Saga Pattern 輕鬆解決分散式交易流程,而本篇要展開的是互動式流程。
情境:
商品改價格需要人員審核,審核者必須在 48 小時內回覆;
若超時未回覆,改價申請自動被 Reject。
試想這樣的情境是不是要額外再建立 Cron Job 呢?
那 Temporal 要如何實作這種「需要外部互動」的流程?
如果要讓外部把審核結果回傳給 Workflow,我們可以在 Workflow Interface 上宣告一個 @UpdateMethod
,讓用戶端在稍後呼叫。
@WorkflowInterface // 宣告為 Temporal 的 Workflow 介面
public interface ProductChangeWorkflow {
// 主流程入口:啟動改價需求,進入審核流程
@WorkflowMethod
void run(ProductChangeSpec spec);
// 外部回覆審核結果:由後台或 API 在審核完成時呼叫
@UpdateMethod
void update(ReviewDecision decision);
}
CompletablePromise
作為「等待外部回覆」的容器,配合 Workflow.await()
設定 48 小時超時。Workflow.await()
並不會占著執行緒,Worker 會把等待條件交給 Temporal Server,一旦任一條件滿足,流程就會從 Workflow.await()
往下繼續執行。
package com.flowzati.process3;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.CompletablePromise;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import java.util.function.Supplier;
public class ProductChangeWorkflowImpl implements ProductChangeWorkflow {
private final ProductChangeActivities acts; // Activities stub,用於呼叫外部動作(通知、記錄、改價)
private final CompletablePromise<ReviewDecision> decisionPromise = Workflow.newPromise(); // 等待外部審核回覆
public ProductChangeWorkflowImpl() {
// 設定 Activities 的重試策略(指數退避)
RetryOptions retryOptions = RetryOptions.newBuilder()
.setBackoffCoefficient(2.0)
.setMaximumAttempts(5)
.build();
// 設定 Activities 的超時與重試
ActivityOptions activityOptions = ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(retryOptions)
.build();
this.acts = Workflow.newActivityStub(ProductChangeActivities.class, activityOptions); // 產生活動代理物件
}
@Override
public void run(ProductChangeSpec spec) {
// Step 1: 通知審核
acts.notifyReviewer(spec);
// Step 2: 等待審核;48 小時未回覆視為 REJECT
Duration timeout = Duration.ofHours(48); // 超時窗口
Supplier<Boolean> isCompleted = () -> decisionPromise.isCompleted(); // 條件:是否已收到回覆
boolean received = Workflow.await(timeout, isCompleted); // 非阻塞等待(Timer 或 Update 任一達成則往下繼續執行)
// 有回覆就取用審核結果;超時則自動拒絕
ReviewDecision reviewDecision = received
? decisionPromise.get()
: new ReviewDecision(ReviewOutcome.REJECT, "system", "timeout after 48 hours");
// Step 3: 儲存審核結果
acts.recordDecision(spec, reviewDecision);
if (reviewDecision.getOutcome() == ReviewOutcome.REJECT) {
// 審核拒絕則結束流程(不進行改價)
return;
}
// Step 4: 審核通過 → 改價
acts.applyNewPrice(spec);
}
@Override
public void update(ReviewDecision decision) {
// 去重複與狀態守護:只接收第一個有效決策,避免重複點擊或重試干擾
if (!this.decisionPromise.isCompleted()) {
this.decisionPromise.complete(decision);
}
}
}
用戶端一開始啟動 Workflow,等審核完成或超時。
審核人員完成後,由後台或 API 觸發 Update 傳回決策。
// 建立用戶端與 Workflow stub(示意)
WorkflowClient client = WorkflowClient.newInstance(service); // 連線到 Temporal 服務
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue("product-change-task-queue") // 指定 Worker 所監聽的 Task Queue
.setWorkflowId("product-change-" + spec.getProductId()) // 設定可追蹤的 Workflow Id
.build();
ProductChangeWorkflow stub = client.newWorkflowStub(ProductChangeWorkflow.class, options); // 產生 Workflow 代理
// 非同步啟動流程(不阻塞目前執行緒)
WorkflowClient.start(stub::run, spec);
// … 之後在審核通過時(例如後台審核頁面按下同意)
ReviewDecision decision = new ReviewDecision(ReviewOutcome.APPROVE, "alice", "OK");
stub.update(decision); // 將審核結果回傳給 Workflow(讓 await 後續繼續)
Workflow 是「虛擬執行」:當程式碼執行到 Workflow.await()
,Worker 會把條件(Timer + 等待 Update)轉換成事件與命令並持久化到 Server,Worker 執行緒就釋放了。等條件達成,Server 會把事件推回 Worker 重放 Workflow 歷史,從 await
之後繼續,既可靠又省資源。
上圖,可以看到事件轉交到 Server 等待 2d(48h)。
上圖,可以看到接收了 update 事件後,繼續推進流程。
Duration.ofHours(48)
或依環境參數設定。notifyReviewer
與 recordDecision
中打審核追蹤記錄,便於排查。decisionPromise
會在歷史中被重放還原),Activities 則是無狀態的外部呼叫。本篇示範了如何以 Temporal 實作「互動式流程」:利用 @UpdateMethod
接收外部回覆,以 CompletablePromise
與 Workflow.await()
實現 48 小時超時的審核等待,並在回覆與超時兩種情境下維持流程一致性。
想像一般使用排程或事件驅動的作法,業務程式會四散多處,且要額外建立定時器導致維護點發散,而使用 Temporal 能以集中可讀的程式碼,直接地表達整個互動式流程,不需要額外排程及手刻狀態轉換,更好維護管理。