在傳統系統裡,商品庫存通常就是一張資料表 + 一堆邏輯、鎖與交易機制。
但當系統拆成微服務、要支援高併發時,庫存鎖衝突、分散式一致性、重試與補償,往往變成工程師的惡夢。
Temporal 提供了一個不同的思維:Entity Pattern。
這種模式不是把庫存存在 DB 然後不停加鎖,而是讓 每個商品庫存都由一條長生命週期(Long Living)的 Workflow 守護。
好處是什麼?
在這篇文章裡,我會用「商品庫存」這個例子,帶你一步步看 Temporal 的 Entity Pattern 是怎麼把複雜問題變得直覺簡單。
在傳統微服務中,我們常以資料庫紀錄代表「實體」,由多個服務存取或事件來讀寫資料。這種方式在高併發與跨服務協作下,容易遇到:
Temporal 提供 Workflow 作為「可重播的狀態機」:
本篇介紹 Temporal 的 Entity Pattern(又稱 Workflow-as-Entity、Workflow-per-Entity)
另外,如果你熟悉 Actor Pattern 的話,這個 Temporal Entity Pattern 相當於升級版:
@ActivityInterface
public interface InventoryActivities {
// 重點:Activity 專責對外 I/O,必須設計成冪等。opKey 用來避免重試造成重複扣/加庫存
/** 補貨:回傳最新庫存;以 opKey 為冪等鍵 */
int restock(String productId, int qty, String opKey);
/** 購買扣庫存:回傳最新庫存(不足時丟業務錯誤);以 opKey 為冪等鍵 */
int purchase(String productId, int qty, String opKey);
}
// 實體對外 API 定義:Workflow(主流程)、Query(唯讀)、Signal(非同步命令)、Update(同步命令)
@WorkflowInterface
public interface InventoryEntityWorkflow {
// Workflow 主方法:啟動單一實體的狀態機;長生命週期、可重播
@WorkflowMethod
void run(String productId, int initialStock);
// Query:不可改變狀態;讀取目前狀態快照
@QueryMethod
int getStock();
// Signal:非同步命令(fire-and-forget);僅入隊,不保證立即回應
@SignalMethod
void submit(Command command);
// Update:同步命令(Request-Reply);會等待執行結果
@UpdateMethod
CommandResult invoke(Command command);
}
本範例使用 Inbox command pattern:外部指令統一入列、工作流程以單執行緒依序消費執行。
核心資料結構
inbox: Queue<Command>
:指令信箱,所有外部命令統一入列。replies: Map<opKey, CompletablePromise<CommandResult>>
:同步命令(Update)的回應暫存,透過 opKey
對應等待中的 Promise。產生者(Producers)
submit(Command)
:非同步命令(Signal)。僅 inbox.offer(command)
,不等待結果。invoke(Command)
:同步命令(Update)。先建立 Promise 並放入 replies
,再入列命令,最後 promise.get()
等待結果。消費者(Consumer)
run(...)
的主迴圈中以 Workflow.await(...)
等待 inbox
有資料或建議切段(continue-as-new
)。poll
取出命令,呼叫 getCommandResult(command)
更新 cachedStock
。replies
取出對應 Promise 並 complete
回應。容量與熱點治理
continue-as-new
週期切段,避免歷史過長影響重播與查詢延遲。public class InventoryEntityWorkflowImpl implements InventoryEntityWorkflow {
// Entity Workflow:單一實體的狀態機,序列化處理命令
private String productId; // 實體 ID(例如 sku-123)
private int cachedStock; // 從 Activity 回應同步更新的快取值(Workflow 內的權威快照)
private final Queue<Command> inbox = new ArrayDeque<>(); // 命令信箱(Signal/Command)
private final Map<String, CompletablePromise<CommandResult>> replies = new HashMap<>(); // opKey -> Promise(同步回應)
// Activity stub:所有外部 I/O 走 Activity;設定 Timeout/Retry/DoNotRetry
// DoNotRetry:把業務錯誤(如 InvalidInput/StockError)交由 Workflow 分支處理
private final InventoryActivities acts = Workflow.newActivityStub(
InventoryActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(10))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(5)
.setInitialInterval(Duration.ofSeconds(1))
.setBackoffCoefficient(2.0)
.setDoNotRetry("InvalidInput", "StockError")
.build())
.build());
@Override
public void run(String productId, int initialStock) {
this.productId = productId;
// 實體初始化:以冪等 opKey 補貨建立初始庫存
this.cachedStock = acts.restock(productId, initialStock, "INIT-" + productId);
while (true) {
// 等待:收到命令或建議切段(歷史過長)
Workflow.await(() -> !inbox.isEmpty() || Workflow.getInfo().isContinueAsNewSuggested());
if (Workflow.getInfo().isContinueAsNewSuggested()) {
// 切段:使用 ContinueAsNew 壓縮歷史,保持良好重播時間
Workflow.continueAsNew(this.productId, 0);
}
while (!inbox.isEmpty()) {
Command command = inbox.poll(); // 以單執行緒順序處理命令
CompletablePromise<CommandResult> reply = replies.remove(command.getOpKey()); // 取出等待回應的 Promise(若有)
CommandResult commandResult = getCommandResult(command);
if (reply != null) {
reply.complete(commandResult); // 完成同步回應
}
}
}
}
private CommandResult getCommandResult(Command command) {
try {
String opKey = command.getOpKey();
if (command.getQty() <= 0) {
// 驗證輸入:不合法視為業務錯誤(DoNotRetry),交由 Workflow 處理
throw ApplicationFailure.newFailure("qty must be > 0", "InvalidInput");
}
int apply = switch (command.getType()) {
case RESTOCK -> cachedStock = acts.restock(productId, command.getQty(), opKey); // 呼叫 Activity,更新權威快照
case PURCHASE -> cachedStock = acts.purchase(productId, command.getQty(), opKey); // 庫存不足由 Activity 拋業務錯誤
};
return CommandResult.ok(apply); // 正常:回傳最新庫存
} catch (ApplicationFailure af) {
// 業務失敗:包裝成 CommandResult,讓呼叫端可判斷
return CommandResult.fail(command.getType().toString(), "fail");
}
}
@Override
public void submit(Command command) {
inbox.offer(command); // 非同步命令:僅送入信箱(fire-and-forget)
}
@Override
public CommandResult invoke(Command command) {
// 同步命令:先註冊 Promise,再送命令,最後等待結果
CompletablePromise<CommandResult> promise = Workflow.newPromise();
replies.put(command.getOpKey(), promise);
inbox.offer(command);
return promise.get();
}
@Override
public int getStock() {
return cachedStock; // Query:讀取目前快照,不改變狀態
}
}
public class Process5Worker {
public static final String TASK_QUEUE = "inventory-tq"; // 與 WorkflowOptions 對齊的任務佇列
public static void main(String[] args) {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); // 本機/遠端服務連線
WorkflowClient client = WorkflowClient.newInstance(service); // 建立用戶端
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker(TASK_QUEUE); // 綁定 Task Queue 的 Worker
worker.registerWorkflowImplementationTypes(InventoryEntityWorkflowImpl.class); // 註冊 Workflow 實作
worker.registerActivitiesImplementations(new InventoryActivitiesImpl()); // 註冊 Activity 實作
factory.start(); // 啟動接單
System.out.println("Worker started on " + TASK_QUEUE);
}
}
public class Process5StartTrigger {
public static void main(String[] args) throws InterruptedException {
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
String productId = "sku-123";
WorkflowOptions workflowOptions = WorkflowOptions.newBuilder()
.setWorkflowId("inventory:" + productId) // Entity 命名:一個實體一個 workflowId
.setTaskQueue("inventory-tq") // 路由到對應的 Task Queue
.build();
InventoryEntityWorkflow workflow = client.newWorkflowStub(InventoryEntityWorkflow.class, workflowOptions);
WorkflowClient.signalWithStart(workflow::run, productId, 10000); // 啟動Long Living Entity Workflow
System.out.println(workflow.getStock()); // Query:讀取目前庫存
workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-1")); // 非同步命令(不等待回應)
workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-2"));
workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-3"));
Thread.sleep(5000); // 等待 Workflow 消化命令
System.out.println(workflow.getStock()); // 再次查詢快照
String opKey = UUID.randomUUID().toString();
CommandResult result = workflow.invoke(new Command(CommandType.PURCHASE, 80, "invoke-"+opKey)); // 同步命令(等待回應)
System.out.println("Success, stock=" + result.getStock());
}
}
workflowId = "inventory:<sku>"
,例如 inventory:sku-123
)RESTOCK
(補貨,透過 submit(Command)
非同步指令)PURCHASE
(購買,透過 invoke(Command)
同步)cachedStock
opKey
作為冪等鍵,重試/重播不會重複生效getStock()
回應目前庫存快照(唯讀、即時回應)workflowId = inventory:<sku>
signalWithStart
確保實體不存在時自動啟動(冷啟動)inventory-tq
,由對應 Worker 處理workflowId
命名規範,體現業務主鍵與分片策略idempotencyKey
;Workflow 記錄去重集合ContinueAsNew
維持良好重播效率Workflow.getVersion
適應版本變更workflowId
水平擴展至多個 Worker本篇 Entity Pattern Workflow 初看程式碼會比較難以理解,但搞清楚之後,能簡單地透過 Single Writer 維持實體操作的順序性。
並且隨規模成長,能以 workflowId 水平擴展,或依倉別、地區、分片進行拆分,做到平穩擴展、操作不卡卡。
不過,當實際業務邏輯需要把流程再細分,例如一個大的主流程要拆成多個子流程並行、或是某些子任務需要單獨追蹤與容錯,這時候就會用到下一篇的 Child Workflow。