iT邦幫忙

2025 iThome 鐵人賽

DAY 4
0

流程介紹

https://ithelp.ithome.com.tw/upload/images/20250918/201411465cakv0JVki.png

首先介紹我們第一個要撰寫的例子,以常見的電商註冊流程,有三個步驟:

  1. 建立帳號
  2. 給 user 加500點
  3. 寄出歡迎信

流程非常簡單,全部走完就結束,中間沒有任何人工處理,也稱之為直通式流程(Straight-through Processing)。如果中途任何一步失敗,讓 Temporal 依設定自動重試。

程式碼

https://ithelp.ithome.com.tw/upload/images/20250918/20141146SmrWwltnnV.png

設定一個流程最少要寫 6 段程式碼,會用到 Temporal SDK,本篇先介紹:

  • 第 1, 2 段:寫代表 workflow 的介面跟實作,處理流程的推進。
  • 第 3, 4 段:寫代表 activities 的介面跟實作,代表實際的工作活動。
  • 第 5 段:(下篇分享) 寫 Worker,它要登記能做哪些 Workflow 及 Activity,連接 Temporal Service,拉取任務執行。
  • 第 6 段:(下篇分享) 寫 Start Trigger,連接 Temporal,觸發啟動指定的 Workflow。

Gradle 套件依賴

MVN REPOSITORY

dependencies {
  implementation 'io.temporal:temporal-sdk:1.31.0'
}

Workflow

依照 Temporla 要求 Workflow 跟 Activity 都要先建立介面,然後標注 SDK 提供的 Annotation,之後才進行實作。

  • Workflow interface
    1. WorkflowInterface 標注在介面上,表示 Workflow
    2. WorkflowMethod 標注在方法上,做為外部啟動流程的進入點,這裡以方法 register 做為進入點。
package com.flowzati.process1;

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

// 1. 標記介面定義 Workflow
@WorkflowInterface
public interface RegistrationWorkflow {
  // 2. 標記方法為流程的進入點。外部呼叫它來啟動流程
  @WorkflowMethod
  void register(RegisterRequest request);
}
  • implements Worfklow:
    1. 這是 Workflow 實作類別,Workflow 本身不能直接呼叫外部系統,必須透過 Activity 來執行。
    2. 這裡在建構式用 newActivityStub 初始化。
    3. 同時綁定了 Retry 或 Timeout 等規則。像是最多重試3次, 執行時間上限60秒。
    4. 實作 register 流程進入點,流程要依序呼叫外部系統:建帳號,贈送 500 點,寄送歡迎信。
    5. register 裡面就很簡潔的跑流程,由於 retry, timeout 規則在建構式就綁定好了,出錯時自動依綁定的規則來處理。
package com.flowzati.process1;

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow;

import java.time.Duration;

public class RegistrationWorkflowImpl implements RegistrationWorkflow {
  private final RegistrationActivities acts;

  public RegistrationWorkflowImpl() {
    RetryOptions retryOptions = RetryOptions.newBuilder()
        .setMaximumAttempts(5) // 最大重試3次
        .setInitialInterval(Duration.ofSeconds(5)) // 第一次重試間隔10秒
        .setBackoffCoefficient(2) // 重試間隔倍數 10, 20, 40, 80...
        .setMaximumInterval(Duration.ofSeconds(100)) // 最多等 100 秒
        .build();
    ActivityOptions activityOptions = ActivityOptions.newBuilder()
        .setRetryOptions(retryOptions)
        .setScheduleToCloseTimeout(Duration.ofSeconds(120)) // 包含重試的時間上限
        .setStartToCloseTimeout(Duration.ofSeconds(15)) // 單次執行時間上限
        .build();
    // Workflow 本身不能直接呼叫外部系統,必須透過 Activity 來執行
    this.acts = Workflow.newActivityStub(RegistrationActivities.class, activityOptions);
  }

  // 流程進入點 register 就很簡單的跑流程,由於 retry, timeout 規則在建構式就綁定好了,出錯時自動依綁定的規則來處理。
  @Override
  public void register(RegisterRequest req) {
    // Step 1: 建帳號
    acts.createAccount(req);

    // Step 2: 送 500 點(冪等:固定操作鍵)
    acts.addPoints(req.getUserId(), 500, "signupBonus");

    // Step 3: 寄歡迎信(冪等:messageId 與 user 綁定)
    acts.sendEmail(req.getEmail(), "welcome-" + req.getUserId());
  }
}

Activity

  • Activity interface
    1. ActivityInterface 標記在介面上,代表內含一組活動
    2. ActivityMethod 標記在方法上,每個方法就代表可以被 Workflow 呼叫的 Activity。
package com.flowzati.process1;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

// ActivityInterface 標記在介面上,代表內含一組活動
@ActivityInterface
public interface RegistrationActivities {
  // ActivityMethod 標記在方法上,每個方法就代表可以被 Workflow 呼叫的 Activity。
  @ActivityMethod
  void createAccount(RegisterRequest request);

  @ActivityMethod
  void addPoints(String userId, int points, String opKey);

  @ActivityMethod
  void sendEmail(String email, String messageId);
}
  • implements Activity
    1. Activity 的實作,主要負責實際工作的邏輯,例如 存取資料庫、呼叫 API、寄送郵件等,這類外部系統的操作,不會用到SDK。
    2. 記得要搭配冪等鍵約束,避免 Retry 造成重覆扣庫扣款之類的問題出現。
    3. 另外這邊也很簡潔,不用寫 Retry, Timeout,因為剛剛已經在 Workflow 設定,交給 Temporal 代為處理。
    4. 範例程式就不真的呼叫外部系統了,以模擬為主。
package com.flowzati.process1;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class RegistrationActivitiesImpl implements RegistrationActivities {
  // Demo 用:記憶體模擬「資料庫」
  private final Set<String> users = ConcurrentHashMap.newKeySet();
  private final Set<String> ops = ConcurrentHashMap.newKeySet(); // 記錄 points/Email 的 idempotency key

  @Override
  public void createAccount(RegisterRequest req) {
    // 冪等:userId 唯一(真實情況請改為 DB 唯一鍵/UPSERT)
    boolean added = users.add(req.getUserId());
    if (!added) {
      return; // 已存在 → 當成成功返回(冪等)
    }
    // 這裡可寫入 DB:INSERT users(id, email) ON CONFLICT DO NOTHING
    simulateExtraInvoke(5);
    System.out.println("Created account for " + req.getUserId() + " (" + req.getEmail() + ")");
  }

  @Override
  public void addPoints(String userId, int points, String opKey) {
    simulateExtraInvoke(5);
    if (!ops.add("POINTS#" + userId + "#" + opKey)) {
      return;
    }
    System.out.println("Added " + points + " points to " + userId + " [op=" + opKey + "]");
  }

  @Override
  public void sendEmail(String email, String messageId) {
    simulateExtraInvoke(5);
    if (!ops.add("EMAIL#" + messageId)) {
      return;
    }
    System.out.println("Sent welcome email to " + email + " [msgId=" + messageId + "]");
  }

  private static void simulateExtraInvoke(int timeout) {
    try {
      TimeUnit.SECONDS.sleep(timeout);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

結語

本篇介紹了負責推進流程的 Workflow 及實際工作的 Activity 的程式撰寫,下一篇會建立 Worker 並指定其負責的範圍,是要推動流程(Workflow)或者實際工作(Activity);介紹如何觸發啟動 Workflow,並使用 Temporal UI 觀看事件歷史。


上一篇
Day03 - Temporal 基礎介紹
下一篇
Day05 - 啟動第一個流程 - Worker / Start Trigger
系列文
Temporal 開發指南:掌握 Workflow as Code 打造穩定可靠的分散式流程9
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言