首先介紹我們第一個要撰寫的例子,以常見的電商註冊流程,有三個步驟:
流程非常簡單,全部走完就結束,中間沒有任何人工處理,也稱之為直通式流程(Straight-through Processing)。如果中途任何一步失敗,讓 Temporal 依設定自動重試。
設定一個流程最少要寫 6 段程式碼,會用到 Temporal SDK,本篇先介紹:
dependencies {
implementation 'io.temporal:temporal-sdk:1.31.0'
}
依照 Temporla 要求 Workflow 跟 Activity 都要先建立介面,然後標注 SDK 提供的 Annotation,之後才進行實作。
package com.flowzati.process1;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
// 1. 標記介面定義 Workflow
@WorkflowInterface
public interface RegistrationWorkflow {
// 2. 標記方法為流程的進入點。外部呼叫它來啟動流程
@WorkflowMethod
void register(RegisterRequest request);
}
package com.flowzati.process1;
import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;
public class RegistrationWorkflowImpl implements RegistrationWorkflow {
private final RegistrationActivities acts;
public RegistrationWorkflowImpl() {
RetryOptions retryOptions = RetryOptions.newBuilder()
.setMaximumAttempts(5) // 最大重試3次
.setInitialInterval(Duration.ofSeconds(5)) // 第一次重試間隔10秒
.setBackoffCoefficient(2) // 重試間隔倍數 10, 20, 40, 80...
.setMaximumInterval(Duration.ofSeconds(100)) // 最多等 100 秒
.build();
ActivityOptions activityOptions = ActivityOptions.newBuilder()
.setRetryOptions(retryOptions)
.setScheduleToCloseTimeout(Duration.ofSeconds(120)) // 包含重試的時間上限
.setStartToCloseTimeout(Duration.ofSeconds(15)) // 單次執行時間上限
.build();
// Workflow 本身不能直接呼叫外部系統,必須透過 Activity 來執行
this.acts = Workflow.newActivityStub(RegistrationActivities.class, activityOptions);
}
// 流程進入點 register 就很簡單的跑流程,由於 retry, timeout 規則在建構式就綁定好了,出錯時自動依綁定的規則來處理。
@Override
public void register(RegisterRequest req) {
// Step 1: 建帳號
acts.createAccount(req);
// Step 2: 送 500 點(冪等:固定操作鍵)
acts.addPoints(req.getUserId(), 500, "signupBonus");
// Step 3: 寄歡迎信(冪等:messageId 與 user 綁定)
acts.sendEmail(req.getEmail(), "welcome-" + req.getUserId());
}
}
package com.flowzati.process1;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
// ActivityInterface 標記在介面上,代表內含一組活動
@ActivityInterface
public interface RegistrationActivities {
// ActivityMethod 標記在方法上,每個方法就代表可以被 Workflow 呼叫的 Activity。
@ActivityMethod
void createAccount(RegisterRequest request);
@ActivityMethod
void addPoints(String userId, int points, String opKey);
@ActivityMethod
void sendEmail(String email, String messageId);
}
package com.flowzati.process1;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class RegistrationActivitiesImpl implements RegistrationActivities {
// Demo 用:記憶體模擬「資料庫」
private final Set<String> users = ConcurrentHashMap.newKeySet();
private final Set<String> ops = ConcurrentHashMap.newKeySet(); // 記錄 points/Email 的 idempotency key
@Override
public void createAccount(RegisterRequest req) {
// 冪等:userId 唯一(真實情況請改為 DB 唯一鍵/UPSERT)
boolean added = users.add(req.getUserId());
if (!added) {
return; // 已存在 → 當成成功返回(冪等)
}
// 這裡可寫入 DB:INSERT users(id, email) ON CONFLICT DO NOTHING
simulateExtraInvoke(5);
System.out.println("Created account for " + req.getUserId() + " (" + req.getEmail() + ")");
}
@Override
public void addPoints(String userId, int points, String opKey) {
simulateExtraInvoke(5);
if (!ops.add("POINTS#" + userId + "#" + opKey)) {
return;
}
System.out.println("Added " + points + " points to " + userId + " [op=" + opKey + "]");
}
@Override
public void sendEmail(String email, String messageId) {
simulateExtraInvoke(5);
if (!ops.add("EMAIL#" + messageId)) {
return;
}
System.out.println("Sent welcome email to " + email + " [msgId=" + messageId + "]");
}
private static void simulateExtraInvoke(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
本篇介紹了負責推進流程的 Workflow 及實際工作的 Activity 的程式撰寫,下一篇會建立 Worker 並指定其負責的範圍,是要推動流程(Workflow)或者實際工作(Activity);介紹如何觸發啟動 Workflow,並使用 Temporal UI 觀看事件歷史。