iT邦幫忙

2025 iThome 鐵人賽

DAY 24
0
Software Development

Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程系列 第 24

Day24 - Temporal Testing(二):Workflow Versioning、Workflow.await、Heartbeat/Cancel

  • 分享至 

  • xImage
  •  

本篇延續 Temporal 第一篇,聚焦驗證四個情境:Workflow Versioning、Workflow.await、長任務的 heartbeat 與取消。

1. 測 Workflow Versioning

Workflow.getVersion 安全演進流程行為。新執行走新版本;舊歷史重播走舊版本。

1.1 Workflow 的介面及實作

@WorkflowInterface
interface VerWorkflow {
  @WorkflowMethod
  String run(String name);
}
class VerWorkflowImpl implements VerWorkflow {
  @Override
  public String run(String name) {
    int v = Workflow.getVersion(
        "greet-change", Workflow.DEFAULT_VERSION, 2);
    if (v == Workflow.DEFAULT_VERSION) {
      return "Hello " + name; // legacy
    } else if (v == 1) {
      return "Hi " + name;   // v1 分支(舊執行重播)
    } else {
      return "Hey " + name;  // v2 新分支(新執行)
    }
  }
}

1.2 測試案例

public class VersioningTest {
  // 確認新版本有走到 v2 分支
  @Test
  void new_runs_take_v2_branch() {
    TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(); 
    try {
      Worker w = env.newWorker("ver-q");                          
      // 註冊 v2 workflow 實作(含 DEFAULT/v1/v2 分支)     
      w.registerWorkflowImplementationTypes(VerWorkflowV2Impl.class);     
      env.start(); // 啟動內嵌服務與 worker
      WorkflowClient c = env.getWorkflowClient(); // 取得測試用 client
      // 建立 workflow stub,指向相同 task queue
      VerWorkflow wf = c.newWorkflowStub(
        VerWorkflow.class,
        WorkflowOptions.newBuilder().setTaskQueue("ver-q").build());

      // 執行新 run,預期走 v2 分支
      String out = wf.run("neo");
      // 斷言結果為 v2 輸出
      assertEquals("Hey neo", out);
    } finally {
      env.close();
    }
  }

  // 以 v2 程式碼重播 v1 時期的歷史,驗證不會出現非決定性錯誤
  @Test
  void replay_v1_history_with_v2_impl() throws Exception {
    WorkflowReplayer.replayWorkflowExecution(
        "src/test/resources/history/greet_v1.json",
        () -> new VerWorkflowV2Impl());
  }
}

1.3 測試重點

  • VerWorkflowImpl 類別,內含 DEFAULT_VERSION 與 v1 分支。
  • 新 Run 走 v2。
  • 舊歷史用 WorkflowReplayer 重播驗證可重播性。

2. 測 Workflow.await 條件等待

Workflow.await 等待條件(如 signal),或設定上限時間做超時處理。

2.1 Workflow 介面與實作

interface AwaitWorkflow {
  @WorkflowMethod
  String run();

  @SignalMethod
  void markReady();
}
class AwaitWorkflowImpl implements AwaitWorkflow {
  // 由 Signal 控制的旗標;條件成立時解除 await
  private boolean ready;

  @Override
  public String run() {
    // 等待最長 1 小時直到條件成立(ready==true)
    // 測試中用的是虛擬時間,會快轉而不阻塞真實時間
    boolean signaled = Workflow.await(
        Duration.ofHours(1), () -> ready);
    // 若逾時(條件未滿足),回傳 TIMEOUT;否則回傳 READY
    if (!signaled) return "TIMEOUT";
    return "READY";
  }

  @Override
  public void markReady() { this.ready = true; } // Signal handler:將條件設為 true
}

2.2 測試案例

public class AwaitTest {
  private static final String TASK_QUEUE = "await-q"; // 測試用 task queue
  private TestWorkflowEnvironment env;                 // in-memory Temporal 服務
  private WorkflowClient client;                       // 測試用 client

  @BeforeEach
  void setUp() {
    env = TestWorkflowEnvironment.newInstance();              // 建立 in-memory 服務
    Worker w = env.newWorker(TASK_QUEUE);                     // 建立 worker 綁定 task queue
    w.registerWorkflowImplementationTypes(AwaitWorkflowImpl.class); // 註冊 workflow 實作
    env.start();                                              // 啟動服務與 worker
    client = env.getWorkflowClient();                         // 取得測試用 client
  }

  @AfterEach
  void tearDown() {
    env.close();
  }

  @Test
  void signal_before_timeout_returns_ready() {
    AwaitWorkflow wf = client.newWorkflowStub(
        AwaitWorkflow.class,
        WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());

    // 非同步啟動 workflow
    WorkflowClient.start(wf::run);
    // 送出 Signal,讓條件成立
    wf.markReady();

    String res = WorkflowStub.fromTyped(wf).getResult(String.class);
    // 驗證結果條件成立應回傳 READY
    assertEquals("READY", res);                                      
  }

  @Test
  void timeout_without_signal_returns_timeout() {
    
    AwaitWorkflow wf = client.newWorkflowStub(
        AwaitWorkflow.class,
        WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());

    // 非同步啟動 workflow
    WorkflowClient.start(wf::run);
    // 未送 Signal,讓條件維持未成立
    // 用時間快轉到 await 超時
    env.sleep(Duration.ofHours(1));

    String res = WorkflowStub.fromTyped(wf).getResult(String.class);
    // 時間已快轉導致超時應回傳 TIMEOUT
    assertEquals("TIMEOUT", res);                                  
  }
}

2.3 測試重點

  • Workflow.await(Duration, condition) 可在虛擬時間中測試超時不需真實等待;也可用 signal 立即喚醒。

3. 長時間 Activity 的 Heartbeat 與取消

3.1 模擬一個需要多次心跳的 Activity,並在測試中觸發取消,驗證行為

@ActivityInterface
interface LongRunningAct {
  @ActivityMethod
  String run();
}
class LongRunningActImpl implements LongRunningAct {
  @Override
  public String run() {
    for (int i = 0; i < 1000; i++) {
      Activity.getExecutionContext().heartbeat(i); // 回報進度,支援超時恢復
      // 模擬長時間工作,加入短暫 sleep(真實系統可為 I/O 或計算)
      try { Thread.sleep(10); } catch (InterruptedException ignored) {}
    }
    return "DONE";
  }
}
// 測試可取消 workflow/activity,應拋出取消例外或提前結束
@WorkflowInterface
interface LongActWorkflow {
  @WorkflowMethod
  void run();
}
class LongActWorkflowImpl implements LongActWorkflow {
  @Override
  public void run() {
    // 建立 Activity stub,設定單次嘗試的最長執行時間(StartToClose)
    LongRunningAct act = Workflow.newActivityStub(
        LongRunningAct.class,
        ActivityOptions.newBuilder()
            .setStartToCloseTimeout(Duration.ofSeconds(30))
            .build());
    // 讓取消(或其他 runtime 例外)向上冒泡,最終使 workflow 被標記為取消
    act.run();
  }
}

3.2 取消正在執行長時間 Activity 的 Workflow

public class CancellationTest {
  @Test
  void cancel_workflow_while_activity_running() {
    TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance();
    try {
      Worker w = env.newWorker("cancel-q");
      w.registerWorkflowImplementationTypes(LongActWorkflowImpl.class);
      // 使用匿名類別的長時間 Activity,內部會 heartbeat
      w.registerActivitiesImplementations(new LongRunningActImpl());
      env.start();

      WorkflowClient c = env.getWorkflowClient();
      LongActWorkflow wf = c.newWorkflowStub(LongActWorkflow.class,
          WorkflowOptions.newBuilder().setTaskQueue("cancel-q").build());
      
      // 非同步啟動,讓 Activity 進入執行迴圈
      WorkflowClient.start(wf::run);

      // 立刻取消該 workflow
      WorkflowStub.fromTyped(wf).cancel();
      try {
        WorkflowStub.fromTyped(wf).getResult(Void.class);
      } catch (WorkflowFailedException e) {
        // 驗證為取消所致(CanceledFailure 表示取消而非一般失敗)
        assertTrue(e.getCause() instanceof CanceledFailure);
      }
    } finally {
      env.close();
    }
  }
}

3.3 測試重點

  • Activity 透過 heartbeat() 定期回報進度,模擬長時間任務的可回復性。
  • 測試時立即呼叫 cancel(),確認取消向下傳遞到執行中的 Activity。
  • 取得結果時應拋出 WorkflowFailedException(CanceledFailure),代表流程因取消而結束。

結語

  • Versioning:以 getVersion 標記變更,舊歷史重播走舊分支,新 run 走新分支。
  • await:用虛擬時間驗證條件與超時,不卡真實時間。
  • 長任務:以 heartbeat() 回報進度;取消時應拋 CanceledFailure

上一篇
Day23 - Temporal Testing (一):Workflow.sleep(), Activity.getExecutionContext()
系列文
Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程24
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言