iT邦幫忙

2025 iThome 鐵人賽

DAY 18
0
Software Development

Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程系列 第 18

Day18 - Temporal Entity Pattern 的設計思維:讓每個商品庫存都有專屬 Workflow

  • 分享至 

  • xImage
  •  

在傳統系統裡,商品庫存通常就是一張資料表 + 一堆邏輯、鎖與交易機制。

但當系統拆成微服務、要支援高併發時,庫存鎖衝突、分散式一致性、重試與補償,往往變成工程師的惡夢。

Temporal 提供了一個不同的思維:Entity Pattern。

這種模式不是把庫存存在 DB 然後不停加鎖,而是讓 每個商品庫存都由一條長生命週期(Long Living)的 Workflow 守護。

好處是什麼?

  • 天然順序性:單一 Workflow 就像「Single Writer」,保證操作不會亂序。
  • Durable Execution:不怕 crash,不用靠手刻補償邏輯。
  • 高可觀測性:每一次扣庫、補貨,背後都有完整 Event History 可追溯。
  • 可水平擴展:隨商品數量成長,以 WorkflowId 分片就能自然擴張。

在這篇文章裡,我會用「商品庫存」這個例子,帶你一步步看 Temporal 的 Entity Pattern 是怎麼把複雜問題變得直覺簡單。

1. 為何需要 Entity Pattern

在傳統微服務中,我們常以資料庫紀錄代表「實體」,由多個服務存取或事件來讀寫資料。這種方式在高併發與跨服務協作下,容易遇到:

  • 資料一致性難題:多處寫入、跨交易協調、補償邏輯分散
  • 併發控制複雜:鎖、版本號、重試、順序性都需自行維護
  • 可觀測性不足:狀態轉移與邏輯散落在多處程式碼與事件流

Temporal 提供 Workflow 作為「可重播的狀態機」:

  • 單一 Workflow 實例對應單一實體(Single Writer/Single Thread illusion)
  • 以事件歷史重播確保決定性,讓狀態演進可追溯、可重播
  • 內建重試、超時、心跳與 Backoff,降低錯誤處理複雜度
  • Signals/Queries 成為實體對外 API,天然解決順序性(ordering) 問題

本篇介紹 Temporal 的 Entity Pattern(又稱 Workflow-as-Entity、Workflow-per-Entity)

  • 這是讓每個「業務實體」(例如使用者、訂單、庫存)都有一個「Long Living Workflow」的建模方式
  • 把實體狀態與操作封裝起來,讓外部以訊號(命令)、查詢來存取這個業務流程實體
  • 並將狀態、邏輯、併發控制、重試與可觀測性一併封裝在 Workflow 裡,達成高一致性、可重播與可演進的分散式狀態機

另外,如果你熟悉 Actor Pattern 的話,這個 Temporal Entity Pattern 相當於升級版:

  • 像 Actor 一樣,Entity Workflow 採取 Single Writer,確保狀態變更不會亂序。
  • 不同的是,Temporal 提供 durable execution 與 完整 event history,讓這個「Actor」不只在 runtime 時期運行,在系統崩潰或重啟而遺失狀態,也可以安全地重播與回復。更是可追溯、可維護的分散式解法。
  • Temporal 稱呼為 Entity Pattern,並且也很適合 DDD(Domain-Driven Design) 的語境,把 Workflow 當作一個有生命週期的「實體」。

2. 流程介紹

  • 首先介紹這個流程負責「商品庫存的管理」。
  • 使用 Entity Pattern 時,會讓每個「商品庫存」都建立一個 Workflow,有以下操作:
    • 查詢庫存(Query)
    • 補貨(加庫)(Signal:非同步命令)
    • 購買(扣庫)(Update:同步命令,需立即得到結果的操作)
  • 所有外部 I/O(實際補貨、購買)放在 Activity,由超時與重試機制保護

3. 程式碼範例

3.1 Activity & Workflow & Worker 實作

@ActivityInterface
public interface InventoryActivities {
  // 重點:Activity 專責對外 I/O,必須設計成冪等。opKey 用來避免重試造成重複扣/加庫存
  /** 補貨:回傳最新庫存;以 opKey 為冪等鍵 */
  int restock(String productId, int qty, String opKey);

  /** 購買扣庫存:回傳最新庫存(不足時丟業務錯誤);以 opKey 為冪等鍵 */
  int purchase(String productId, int qty, String opKey);
}
// 實體對外 API 定義:Workflow(主流程)、Query(唯讀)、Signal(非同步命令)、Update(同步命令)
@WorkflowInterface
public interface InventoryEntityWorkflow {
  // Workflow 主方法:啟動單一實體的狀態機;長生命週期、可重播
  @WorkflowMethod
  void run(String productId, int initialStock);

  // Query:不可改變狀態;讀取目前狀態快照
  @QueryMethod
  int getStock();

  // Signal:非同步命令(fire-and-forget);僅入隊,不保證立即回應
  @SignalMethod
  void submit(Command command);

  // Update:同步命令(Request-Reply);會等待執行結果
  @UpdateMethod
  CommandResult invoke(Command command);
}

3.2 Inbox Command Pattern

本範例使用 Inbox command pattern:外部指令統一入列、工作流程以單執行緒依序消費執行。

  • 核心資料結構

    • inbox: Queue<Command>:指令信箱,所有外部命令統一入列。
    • replies: Map<opKey, CompletablePromise<CommandResult>>:同步命令(Update)的回應暫存,透過 opKey 對應等待中的 Promise。
  • 產生者(Producers)

    • submit(Command):非同步命令(Signal)。僅 inbox.offer(command),不等待結果。
    • invoke(Command):同步命令(Update)。先建立 Promise 並放入 replies,再入列命令,最後 promise.get() 等待結果。
  • 消費者(Consumer)

    • run(...) 的主迴圈中以 Workflow.await(...) 等待 inbox 有資料或建議切段(continue-as-new)。
    • 以單執行緒 poll 取出命令,呼叫 getCommandResult(command) 更新 cachedStock
    • 若該命令為同步類型,從 replies 取出對應 Promise 並 complete 回應。
  • 容量與熱點治理

    • continue-as-new 週期切段,避免歷史過長影響重播與查詢延遲。
    • 熱點 SKU 可考慮拆分實體或以子工作流程分區。
public class InventoryEntityWorkflowImpl implements InventoryEntityWorkflow {
  // Entity Workflow:單一實體的狀態機,序列化處理命令
  private String productId; // 實體 ID(例如 sku-123)
  private int cachedStock; // 從 Activity 回應同步更新的快取值(Workflow 內的權威快照)

  private final Queue<Command> inbox = new ArrayDeque<>(); // 命令信箱(Signal/Command)
  private final Map<String, CompletablePromise<CommandResult>> replies = new HashMap<>(); // opKey -> Promise(同步回應)

  // Activity stub:所有外部 I/O 走 Activity;設定 Timeout/Retry/DoNotRetry
  // DoNotRetry:把業務錯誤(如 InvalidInput/StockError)交由 Workflow 分支處理
  private final InventoryActivities acts = Workflow.newActivityStub(
      InventoryActivities.class,
      ActivityOptions.newBuilder()
          .setStartToCloseTimeout(Duration.ofSeconds(10))
          .setRetryOptions(RetryOptions.newBuilder()
              .setMaximumAttempts(5)
              .setInitialInterval(Duration.ofSeconds(1))
              .setBackoffCoefficient(2.0)
              .setDoNotRetry("InvalidInput", "StockError")
              .build())
          .build());

  @Override
  public void run(String productId, int initialStock) {
    this.productId = productId;
    // 實體初始化:以冪等 opKey 補貨建立初始庫存
    this.cachedStock = acts.restock(productId, initialStock, "INIT-" + productId);

    while (true) {
      // 等待:收到命令或建議切段(歷史過長)
      Workflow.await(() -> !inbox.isEmpty() || Workflow.getInfo().isContinueAsNewSuggested());
      if (Workflow.getInfo().isContinueAsNewSuggested()) {
        // 切段:使用 ContinueAsNew 壓縮歷史,保持良好重播時間
        Workflow.continueAsNew(this.productId, 0);
      }
      while (!inbox.isEmpty()) {
        Command command = inbox.poll(); // 以單執行緒順序處理命令
        CompletablePromise<CommandResult> reply = replies.remove(command.getOpKey()); // 取出等待回應的 Promise(若有)
        CommandResult commandResult = getCommandResult(command);
        if (reply != null) {
          reply.complete(commandResult); // 完成同步回應
        }
      }
    }
  }

  private CommandResult getCommandResult(Command command) {
    try {
      String opKey = command.getOpKey();
      if (command.getQty() <= 0) {
        // 驗證輸入:不合法視為業務錯誤(DoNotRetry),交由 Workflow 處理
        throw ApplicationFailure.newFailure("qty must be > 0", "InvalidInput");
      }

      int apply = switch (command.getType()) {
        case RESTOCK -> cachedStock = acts.restock(productId, command.getQty(), opKey); // 呼叫 Activity,更新權威快照
        case PURCHASE -> cachedStock = acts.purchase(productId, command.getQty(), opKey); // 庫存不足由 Activity 拋業務錯誤
      };
      return CommandResult.ok(apply); // 正常:回傳最新庫存
    } catch (ApplicationFailure af) {
      // 業務失敗:包裝成 CommandResult,讓呼叫端可判斷
      return CommandResult.fail(command.getType().toString(), "fail");
    }
  }


  @Override
  public void submit(Command command) {
    inbox.offer(command); // 非同步命令:僅送入信箱(fire-and-forget)
  }

  @Override
  public CommandResult invoke(Command command) {
    // 同步命令:先註冊 Promise,再送命令,最後等待結果
    CompletablePromise<CommandResult> promise = Workflow.newPromise();
    replies.put(command.getOpKey(), promise);
    inbox.offer(command);
    return promise.get();
  }

  @Override
  public int getStock() {
    return cachedStock; // Query:讀取目前快照,不改變狀態
  }
}
public class Process5Worker {
  public static final String TASK_QUEUE = "inventory-tq"; // 與 WorkflowOptions 對齊的任務佇列
  public static void main(String[] args) {

    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); // 本機/遠端服務連線
    WorkflowClient client = WorkflowClient.newInstance(service); // 建立用戶端

    WorkerFactory factory = WorkerFactory.newInstance(client);
    Worker worker = factory.newWorker(TASK_QUEUE); // 綁定 Task Queue 的 Worker

    worker.registerWorkflowImplementationTypes(InventoryEntityWorkflowImpl.class); // 註冊 Workflow 實作
    worker.registerActivitiesImplementations(new InventoryActivitiesImpl()); // 註冊 Activity 實作

    factory.start(); // 啟動接單
    System.out.println("Worker started on " + TASK_QUEUE);
  }
}

3.3 啟動流程、送出補貨指令、送出購買指令

public class Process5StartTrigger {
  public static void main(String[] args) throws InterruptedException {
    WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
    WorkflowClient client = WorkflowClient.newInstance(service);
    String productId = "sku-123";

    WorkflowOptions workflowOptions = WorkflowOptions.newBuilder()
        .setWorkflowId("inventory:" + productId) // Entity 命名:一個實體一個 workflowId
        .setTaskQueue("inventory-tq") // 路由到對應的 Task Queue
        .build();
    
    InventoryEntityWorkflow workflow = client.newWorkflowStub(InventoryEntityWorkflow.class, workflowOptions);
    
    WorkflowClient.signalWithStart(workflow::run, productId, 10000); // 啟動Long Living Entity Workflow
    
    System.out.println(workflow.getStock()); // Query:讀取目前庫存
    
    workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-1")); // 非同步命令(不等待回應)
    workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-2"));
    workflow.submit(new Command(CommandType.RESTOCK, 20, "op-restock-3"));
    
    Thread.sleep(5000); // 等待 Workflow 消化命令
    
    System.out.println(workflow.getStock()); // 再次查詢快照
    
    String opKey = UUID.randomUUID().toString();
    
    CommandResult result = workflow.invoke(new Command(CommandType.PURCHASE, 80, "invoke-"+opKey)); // 同步命令(等待回應)
    System.out.println("Success, stock=" + result.getStock());
  }
}

4. 核心概念與設計原則

4.1 Entity 與 Workflow 的一對一

  • 一個商品 SKU 對應一個 Workflow 實例(workflowId = "inventory:<sku>",例如 inventory:sku-123
  • Workflow 是該實體的 Single Writer (唯一寫入者),序列化處理庫存操作,避免競爭

4.2 訊號(Signal)/ 更新(Update)= 命令(Command)

  • 本範例命令:
    • RESTOCK(補貨,透過 submit(Command) 非同步指令)
    • PURCHASE(購買,透過 invoke(Command) 同步)
  • Workflow 以狀態機處理命令並更新 cachedStock
  • 每筆命令皆帶 opKey 作為冪等鍵,重試/重播不會重複生效

4.3 查詢(Query)= 讀取當前狀態

  • 使用 getStock() 回應目前庫存快照(唯讀、即時回應)
  • 避免高頻或重計算的大型查詢;必要時使用快照

4.4 StartTrigger

  • 用戶端根據 SKU 設定 workflowId = inventory:<sku>
  • 使用 signalWithStart 確保實體不存在時自動啟動(冷啟動)
  • 任務派送到 inventory-tq,由對應 Worker 處理

5. 實踐清單

5.1 基本設計

  • 明確的 workflowId 命名規範,體現業務主鍵與分片策略
  • 所有命令攜帶 idempotencyKey;Workflow 記錄去重集合
  • Commands/Queries 精簡穩定,避免臃腫非必要欄位
  • 將外部 I/O 置於 Activity,設定合適的 Retry/Timeout
  • 存活:可永久存活,以 ContinueAsNew 維持良好重播效率
  • 停用:實體封存(Archive)或終止(Cancel/Complete)
  • 版本演進:使用 SDK 的 Workflow.getVersion 適應版本變更

5.2 併發、限速與容量規劃

  • 每個 Entity Workflow 天然序列化其命令處理,避免鎖與競態
  • 跨實體可高度併發:以 workflowId 水平擴展至多個 Worker
  • 以 Task Queue 配額與 Rate Limit 控制輸入速率,必要時採用背壓(Backpressure)
  • 熱點實體(Hot Keys)可用分片實體(Partitioned Entity)或拆分子工作流程(Child Workflows)

5.3 Anti-patterns

  • 單體超級實體(Mega Entity)承載過多職責,導致熱點與低可演進性
  • 以高頻 Query 當輪詢(Polling),應改用事件推送或降低頻率

結語

本篇 Entity Pattern Workflow 初看程式碼會比較難以理解,但搞清楚之後,能簡單地透過 Single Writer 維持實體操作的順序性。
並且隨規模成長,能以 workflowId 水平擴展,或依倉別、地區、分片進行拆分,做到平穩擴展、操作不卡卡。

不過,當實際業務邏輯需要把流程再細分,例如一個大的主流程要拆成多個子流程並行、或是某些子任務需要單獨追蹤與容錯,這時候就會用到下一篇的 Child Workflow。


上一篇
Day17 - Temporal 互動 API 的設計指南(Signal / Query / Update)
下一篇
Day19 - Temporal Child Workflow:大型流程的治理模式
系列文
Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程21
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言