本篇延續 Temporal 第一篇,聚焦驗證四個情境:Workflow Versioning、Workflow.await、長任務的 heartbeat 與取消。
以 Workflow.getVersion
安全演進流程行為。新執行走新版本;舊歷史重播走舊版本。
@WorkflowInterface
interface VerWorkflow {
@WorkflowMethod
String run(String name);
}
class VerWorkflowImpl implements VerWorkflow {
@Override
public String run(String name) {
int v = Workflow.getVersion(
"greet-change", Workflow.DEFAULT_VERSION, 2);
if (v == Workflow.DEFAULT_VERSION) {
return "Hello " + name; // legacy
} else if (v == 1) {
return "Hi " + name; // v1 分支(舊執行重播)
} else {
return "Hey " + name; // v2 新分支(新執行)
}
}
}
public class VersioningTest {
// 確認新版本有走到 v2 分支
@Test
void new_runs_take_v2_branch() {
TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance();
try {
Worker w = env.newWorker("ver-q");
// 註冊 v2 workflow 實作(含 DEFAULT/v1/v2 分支)
w.registerWorkflowImplementationTypes(VerWorkflowV2Impl.class);
env.start(); // 啟動內嵌服務與 worker
WorkflowClient c = env.getWorkflowClient(); // 取得測試用 client
// 建立 workflow stub,指向相同 task queue
VerWorkflow wf = c.newWorkflowStub(
VerWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue("ver-q").build());
// 執行新 run,預期走 v2 分支
String out = wf.run("neo");
// 斷言結果為 v2 輸出
assertEquals("Hey neo", out);
} finally {
env.close();
}
}
// 以 v2 程式碼重播 v1 時期的歷史,驗證不會出現非決定性錯誤
@Test
void replay_v1_history_with_v2_impl() throws Exception {
WorkflowReplayer.replayWorkflowExecution(
"src/test/resources/history/greet_v1.json",
() -> new VerWorkflowV2Impl());
}
}
WorkflowReplayer
重播驗證可重播性。用 Workflow.await
等待條件(如 signal),或設定上限時間做超時處理。
interface AwaitWorkflow {
@WorkflowMethod
String run();
@SignalMethod
void markReady();
}
class AwaitWorkflowImpl implements AwaitWorkflow {
// 由 Signal 控制的旗標;條件成立時解除 await
private boolean ready;
@Override
public String run() {
// 等待最長 1 小時直到條件成立(ready==true)
// 測試中用的是虛擬時間,會快轉而不阻塞真實時間
boolean signaled = Workflow.await(
Duration.ofHours(1), () -> ready);
// 若逾時(條件未滿足),回傳 TIMEOUT;否則回傳 READY
if (!signaled) return "TIMEOUT";
return "READY";
}
@Override
public void markReady() { this.ready = true; } // Signal handler:將條件設為 true
}
public class AwaitTest {
private static final String TASK_QUEUE = "await-q"; // 測試用 task queue
private TestWorkflowEnvironment env; // in-memory Temporal 服務
private WorkflowClient client; // 測試用 client
@BeforeEach
void setUp() {
env = TestWorkflowEnvironment.newInstance(); // 建立 in-memory 服務
Worker w = env.newWorker(TASK_QUEUE); // 建立 worker 綁定 task queue
w.registerWorkflowImplementationTypes(AwaitWorkflowImpl.class); // 註冊 workflow 實作
env.start(); // 啟動服務與 worker
client = env.getWorkflowClient(); // 取得測試用 client
}
@AfterEach
void tearDown() {
env.close();
}
@Test
void signal_before_timeout_returns_ready() {
AwaitWorkflow wf = client.newWorkflowStub(
AwaitWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
// 非同步啟動 workflow
WorkflowClient.start(wf::run);
// 送出 Signal,讓條件成立
wf.markReady();
String res = WorkflowStub.fromTyped(wf).getResult(String.class);
// 驗證結果條件成立應回傳 READY
assertEquals("READY", res);
}
@Test
void timeout_without_signal_returns_timeout() {
AwaitWorkflow wf = client.newWorkflowStub(
AwaitWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
// 非同步啟動 workflow
WorkflowClient.start(wf::run);
// 未送 Signal,讓條件維持未成立
// 用時間快轉到 await 超時
env.sleep(Duration.ofHours(1));
String res = WorkflowStub.fromTyped(wf).getResult(String.class);
// 時間已快轉導致超時應回傳 TIMEOUT
assertEquals("TIMEOUT", res);
}
}
Workflow.await(Duration, condition)
可在虛擬時間中測試超時不需真實等待;也可用 signal 立即喚醒。@ActivityInterface
interface LongRunningAct {
@ActivityMethod
String run();
}
class LongRunningActImpl implements LongRunningAct {
@Override
public String run() {
for (int i = 0; i < 1000; i++) {
Activity.getExecutionContext().heartbeat(i); // 回報進度,支援超時恢復
// 模擬長時間工作,加入短暫 sleep(真實系統可為 I/O 或計算)
try { Thread.sleep(10); } catch (InterruptedException ignored) {}
}
return "DONE";
}
}
// 測試可取消 workflow/activity,應拋出取消例外或提前結束
@WorkflowInterface
interface LongActWorkflow {
@WorkflowMethod
void run();
}
class LongActWorkflowImpl implements LongActWorkflow {
@Override
public void run() {
// 建立 Activity stub,設定單次嘗試的最長執行時間(StartToClose)
LongRunningAct act = Workflow.newActivityStub(
LongRunningAct.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
// 讓取消(或其他 runtime 例外)向上冒泡,最終使 workflow 被標記為取消
act.run();
}
}
public class CancellationTest {
@Test
void cancel_workflow_while_activity_running() {
TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance();
try {
Worker w = env.newWorker("cancel-q");
w.registerWorkflowImplementationTypes(LongActWorkflowImpl.class);
// 使用匿名類別的長時間 Activity,內部會 heartbeat
w.registerActivitiesImplementations(new LongRunningActImpl());
env.start();
WorkflowClient c = env.getWorkflowClient();
LongActWorkflow wf = c.newWorkflowStub(LongActWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue("cancel-q").build());
// 非同步啟動,讓 Activity 進入執行迴圈
WorkflowClient.start(wf::run);
// 立刻取消該 workflow
WorkflowStub.fromTyped(wf).cancel();
try {
WorkflowStub.fromTyped(wf).getResult(Void.class);
} catch (WorkflowFailedException e) {
// 驗證為取消所致(CanceledFailure 表示取消而非一般失敗)
assertTrue(e.getCause() instanceof CanceledFailure);
}
} finally {
env.close();
}
}
}
heartbeat()
定期回報進度,模擬長時間任務的可回復性。cancel()
,確認取消向下傳遞到執行中的 Activity。WorkflowFailedException(CanceledFailure)
,代表流程因取消而結束。getVersion
標記變更,舊歷史重播走舊分支,新 run 走新分支。heartbeat()
回報進度;取消時應拋 CanceledFailure
。