首先安裝 Temporal Server,等一下建立的 Worker 及 Start Trigger 將藉由此核心流程引擎進行運作。(附帶會用到指令)
將 Workflow Event History 保存七天: temporal operator namespace update --namespace default --retention 7d
確認 retention 是否調整成功 Config.WorkflowExecutionRetentionTtl(168h): temporal operator namespace describe --namespace default
啟動 Temporal Server (關閉後 Event History 消失): temporal server start-dev
啟動 Temporal Server (關閉後存檔在指定路徑上,下次重啟 Server 則 Event History 仍存在): temporal server start-dev --db-filename <path>/temporal.db
Web Console UI: http://localhost:8233
執行 main 方法啟動 Worker 拉取任務:
可同時跑多執行緒處理(Worker factory 內建併發池)。可以多台 Worker 連到同一 Task Queue,也就是說 Worker 可以水平擴充。
package com.flowzati.process1;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
public class Process1Worker {
public static final String TASK_QUEUE = "registration-tq";
// 1. 啟動程式
public static void main(String[] args) {
// 2. 連線 Temporal Server
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(service));
// 3. 建立 Worker,指定監聽的 Task Queue
Worker worker = factory.newWorker(TASK_QUEUE);
// 4. 登記可處理的 Workflow / Activity
worker.registerWorkflowImplementationTypes(RegistrationWorkflowImpl.class);
worker.registerActivitiesImplementations(new RegistrationActivitiesImpl());
// 5. 啟動 Worker:開啟執行緒持續從 Task Queue poll Task
factory.start();
System.out.println("Worker started on " + TASK_QUEUE);
}
}
執行 main 方法觸發啟動流程:
要提一下這個 workflow id 很重要,之後所有 workflow 互動更新都必須靠這個 Id,會用來確保流程有序且冪等。
package com.flowzati.process1;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.UUID;
import static com.flowzati.process1.Process1Worker.TASK_QUEUE;
public class Process1StartTrigger {
public static void main(String[] args) {
// 1. 建立與 Temporal Server 的連線
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
String userId = UUID.randomUUID().toString();
String email = "flow@gmail.com";
// 2. 指定要啟動的 Workflow Interface
RegistrationWorkflow workflow = client.newWorkflowStub(
RegistrationWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE) // 3. 設定目標 Task Queue Name
.setWorkflowId("registration-" + userId) // 4. Workflow 唯一 ID
.build());
// 5. 輸入參數,呼叫 Workflow start 啟動
RegisterRequest request = new RegisterRequest(userId, email);
WorkflowExecution execution = WorkflowClient.start(workflow::register, request);
System.out.println("WorkflowId: " + execution.getWorkflowId());
System.out.println("RunId: " + execution.getRunId());
}
}
找到第一筆 Running 的 Workflow,Workflow ID 為 registration-開頭
點擊 Workflow ID,確認流程一開始啟動的狀態,目前只走了兩個步驟 Workflow Execution Started, Workflow Task Scheduled
找到上一篇的 Process1Worker,執行 main 方法,啟動 Worker,確認 Workflow 及 Activity 的執行過程。
這是 Event History compact 的顯示方式,只列出了執行的 Activity
這是 Event History All 的顯示方式,包含流程起訖、流程推進以及 Activity
在這裡可以看到先前提過的,Temporal Service 不只是分派任務,它還會把每個事件記錄在 Event History 裡,讓流程能完整追蹤與還原。
今天完成了第一個簡單的三步驟直通式流程,也透過 Web Console 看到 Workflow Event Hsitory,還有他能夠動態地顯示流程的執行。這對於開發或除錯都是有跡可循的,明天藉由這個簡單三個步驟的流程案例,我們來討論處理分散式流程的基本觀念。