iT邦幫忙

2025 iThome 鐵人賽

DAY 15
0
Software Development

Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程系列 第 15

Day 15 - Temporal 消除排程與狀態管理,流程寫得更直覺

  • 分享至 

  • xImage
  •  

前一篇的流程搭配 Temporal 的 Saga Pattern 輕鬆解決分散式交易流程,而本篇要展開的是互動式流程。

1. 流程介紹

https://ithelp.ithome.com.tw/upload/images/20250930/20141146m5P6y02uWJ.png

情境:
商品改價格需要人員審核,審核者必須在 48 小時內回覆;
若超時未回覆,改價申請自動被 Reject。

試想這樣的情境是不是要額外再建立 Cron Job 呢?

那 Temporal 要如何實作這種「需要外部互動」的流程?

2. 實作範例

2.1 定義 Workflow 介面(含 Update 方法)

如果要讓外部把審核結果回傳給 Workflow,我們可以在 Workflow Interface 上宣告一個 @UpdateMethod,讓用戶端在稍後呼叫。

@WorkflowInterface // 宣告為 Temporal 的 Workflow 介面
public interface ProductChangeWorkflow {

  // 主流程入口:啟動改價需求,進入審核流程
  @WorkflowMethod
  void run(ProductChangeSpec spec);

  // 外部回覆審核結果:由後台或 API 在審核完成時呼叫
  @UpdateMethod
  void update(ReviewDecision decision);
}

2.2 Workflow 實作:等待 48 小時或更新到來

  1. 使用 CompletablePromise 作為「等待外部回覆」的容器,配合 Workflow.await() 設定 48 小時超時。
  2. Workflow.await() 並不會占著執行緒,Worker 會把等待條件交給 Temporal Server,一旦任一條件滿足,流程就會從 Workflow.await() 往下繼續執行。
    1. Server 會建立 48 小時的 Timer
    2. 同時也會等著接收 Update 事件。
package com.flowzati.process3;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.CompletablePromise;
import io.temporal.workflow.Workflow;

import java.time.Duration;
import java.util.function.Supplier;

public class ProductChangeWorkflowImpl implements ProductChangeWorkflow {
  private final ProductChangeActivities acts; // Activities stub,用於呼叫外部動作(通知、記錄、改價)
  private final CompletablePromise<ReviewDecision> decisionPromise = Workflow.newPromise(); // 等待外部審核回覆

  public ProductChangeWorkflowImpl() {
    // 設定 Activities 的重試策略(指數退避)
    RetryOptions retryOptions = RetryOptions.newBuilder()
        .setBackoffCoefficient(2.0)
        .setMaximumAttempts(5)
        .build();
    // 設定 Activities 的超時與重試
    ActivityOptions activityOptions = ActivityOptions.newBuilder()
        .setStartToCloseTimeout(Duration.ofSeconds(30))
        .setRetryOptions(retryOptions)
        .build();
    this.acts = Workflow.newActivityStub(ProductChangeActivities.class, activityOptions); // 產生活動代理物件
  }

  @Override
  public void run(ProductChangeSpec spec) {
    // Step 1: 通知審核
    acts.notifyReviewer(spec);

    // Step 2: 等待審核;48 小時未回覆視為 REJECT
    Duration timeout = Duration.ofHours(48); // 超時窗口
    Supplier<Boolean> isCompleted = () -> decisionPromise.isCompleted(); // 條件:是否已收到回覆
    boolean received = Workflow.await(timeout, isCompleted); // 非阻塞等待(Timer 或 Update 任一達成則往下繼續執行)

    // 有回覆就取用審核結果;超時則自動拒絕
    ReviewDecision reviewDecision = received
        ? decisionPromise.get()
        : new ReviewDecision(ReviewOutcome.REJECT, "system", "timeout after 48 hours");

    // Step 3: 儲存審核結果
    acts.recordDecision(spec, reviewDecision);

    if (reviewDecision.getOutcome() == ReviewOutcome.REJECT) {
      // 審核拒絕則結束流程(不進行改價)
      return;
    }

    // Step 4: 審核通過 → 改價
    acts.applyNewPrice(spec);
  }

  @Override
  public void update(ReviewDecision decision) {
    // 去重複與狀態守護:只接收第一個有效決策,避免重複點擊或重試干擾
    if (!this.decisionPromise.isCompleted()) {
      this.decisionPromise.complete(decision);
    }
  }
}

2.3 用戶端範例:啟動流程與送出 Update

用戶端一開始啟動 Workflow,等審核完成或超時。

審核人員完成後,由後台或 API 觸發 Update 傳回決策。

// 建立用戶端與 Workflow stub(示意)
WorkflowClient client = WorkflowClient.newInstance(service); // 連線到 Temporal 服務
WorkflowOptions options = WorkflowOptions.newBuilder()
    .setTaskQueue("product-change-task-queue") // 指定 Worker 所監聽的 Task Queue
    .setWorkflowId("product-change-" + spec.getProductId()) // 設定可追蹤的 Workflow Id
    .build();
ProductChangeWorkflow stub = client.newWorkflowStub(ProductChangeWorkflow.class, options); // 產生 Workflow 代理

// 非同步啟動流程(不阻塞目前執行緒)
WorkflowClient.start(stub::run, spec);

// … 之後在審核通過時(例如後台審核頁面按下同意)
ReviewDecision decision = new ReviewDecision(ReviewOutcome.APPROVE, "alice", "OK");
stub.update(decision); // 將審核結果回傳給 Workflow(讓 await 後續繼續)

3. 為什麼 await 是「非阻塞」?

Workflow 是「虛擬執行」:當程式碼執行到 Workflow.await(),Worker 會把條件(Timer + 等待 Update)轉換成事件與命令並持久化到 Server,Worker 執行緒就釋放了。等條件達成,Server 會把事件推回 Worker 重放 Workflow 歷史,從 await 之後繼續,既可靠又省資源。

https://ithelp.ithome.com.tw/upload/images/20250930/20141146uYJNZIwifY.png

上圖,可以看到事件轉交到 Server 等待 2d(48h)。

https://ithelp.ithome.com.tw/upload/images/20250930/20141146g4SK41vXfk.png

上圖,可以看到接收了 update 事件後,繼續推進流程。

4. 最佳實務

  • 明確超時策略:用 Duration.ofHours(48) 或依環境參數設定。
  • 去重與狀態守護:像範例只接受第一個決策,避免重複點擊或重試造成誤判。
  • 可觀測性:在 notifyReviewerrecordDecision 中打審核追蹤記錄,便於排查。
  • 失敗重試:Activities 設定合理的 RetryOptions,避免短暫失敗造成流程終止。
  • 權限與驗證:在 Update 端檢查操作者身分與狀態(例如已超時就拒收 Update)。
  • 補充: Workflow 是「有狀態」的物件(例如 decisionPromise 會在歷史中被重放還原),Activities 則是無狀態的外部呼叫。

5. 結語

本篇示範了如何以 Temporal 實作「互動式流程」:利用 @UpdateMethod 接收外部回覆,以 CompletablePromiseWorkflow.await() 實現 48 小時超時的審核等待,並在回覆與超時兩種情境下維持流程一致性。

想像一般使用排程或事件驅動的作法,業務程式會四散多處,且要額外建立定時器導致維護點發散,而使用 Temporal 能以集中可讀的程式碼,直接地表達整個互動式流程,不需要額外排程及手刻狀態轉換,更好維護管理。


上一篇
Day14 - Temporal 實作 Saga Pattern,打造可靠的分散式交易流程
下一篇
Day16 - Temporal 正確啟動流程的方式
系列文
Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程21
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言