iT邦幫忙

2025 iThome 鐵人賽

DAY 29
0
Modern Web

用 Effect 實現產品級軟體系列 第 29

[學習 Effect Day29] Effect 並行執行(一)

  • 分享至 

  • xImage
  •  

在 JS/TS 中,Promise 能夠處理單個或多個非同步的運算。但遇到取消、資源清理、超時、平行度控制與競速等需求時,卻缺乏一致的標準寫法,往往得自行組合 AbortController、計時器與額外邏輯來解決實務問題。

Effect 的其中一個目標,是讓併發可預期、可推理、也易於測試。做法很直接:用 Fiber 表示一個執行中的 Effect 工作(可等待、可探測、可協作取消),再由 Scope 的結構化併發負責生命週期與資源清理。這樣就能在同一套語意與 API 下,穩定處理超時、競速、平行與清理,而不用再手動組合 AbortController、計時器和各種清理細節。

Fiber 是什麼?為什麼它被稱為「虛擬執行緒」

Effect 是惰性、不可變的,它描述「可能成功和失敗」的計算邏輯包含運算相關的依賴。但它不會自動執行。不過當你主動運行時,這段計算邏輯就會在 fiber 上運算;這雖然聽起來很像作業系統的 Thread,但不是喔~它是由 Effect runtime 管理的「極輕量執行單元」。更白話的講,他是一個「一個可管理的執行中工作實例」。在運算過程中你可以等待運算結果、中途取消、觀察運算狀態等。

官網將 fiber 比喻成「虛擬執行緒」。因為 JavaScript 本質上是單執行緒,不過因為瀏覽器與 Node.js 的環境都有 event loop 的機制設計。而 Effect 套件就是透過這個機制在 runtime 包裝 Effect 為一個「邏輯執行緒」,並以極低成本地切換它們的執行權。這種在單一 OS thread 上的協作式切換,就形成了類似「多執行緒」的效果:你看起來同時在做很多事,但實際上不用像真正多執行緒付出昂貴的成本。

小結

  • Effect:延遲執行、不可變的計算描述,攜帶 R(環境)、E(錯誤)、A(成功)。
  • Fiber:Effect 在 Runtime 上的執行實體;以協作式排程在單一 OS thread 上提供高併發功能。

fiber 是透過哪些機制達成「高併發」

核心在於「event loop + 非阻塞 I/O + 協作式排程」:

  • 非阻塞 I/O 與計時器

    • 當 fiber 執行到 Effect.sleep(…) 或發動非同步 I/O(如 fetch),runtime 會註冊對應的計時器或 I/O 監聽,將該 fiber 標記為 suspended,立即把控制權交回 event loop,不阻塞 JavaScript 執行緒。
    • event loop 期間可以驅動其他 callback 與排程到期的 fibers。當計時器到期或 I/O 完成,runtime 把對應 fiber 放回就緒佇列,透過 microtask/macrotask 機制安排它繼續執行。
  • 協作式排程

    • Effect runtime 以小步解譯與 trampoline 拆成可中斷的 continuations,並在邊界(Effect.yieldNowsleep、I/O)主動讓出執行權,避免長時間佔用執行緒。
    • 因為大多數 fibers 在等待外部事件,等待期間幾乎不耗 CPU、只佔少量記憶體,所以可以同時管理數千甚至數萬個 I/O-bound fibers。但若是長時間 CPU-bound 的工作,需要顯式引入 yield(主動在程式裡插入能夠「讓出執行權」的點),或改用多執行緒/多進程以獲得更有效的平行性運算。

Fiber 的生命週期與型別直覺

每個 fiber 都有明確的生命週期:開始執行(running)、可能在等待 I/O 或計時器時暫停(suspended)、最後以成功或失敗結束(done)。Effect 用 Exit 把「結局」表述為 Success 或 Failure(失敗內含 Cause,可區分錯誤、取消與缺陷)。

在型別層面,你可以把 fiber 想成一個帶兩個型別參數的控制柄(可用來操作某個正在進行或可被控制之事物的引用):

        ┌─── Represents the success type
        │        ┌─── Represents the error type
        │        │
        ▼        ▼
Fiber<Success, Error>
  • 成功完成時會產生 Success 型別的值。
  • 失敗結束時會以 Error 型別的錯誤收尾。

與 Effect 不同,fiber 沒有「需求(依賴)型別」參數,因為一旦開始執行,代表它需要的環境/依賴都已經被提供好了。這讓 Fiber.joinFiber.await 在編譯期就能維持成功/失敗的型別安全,同時在執行期保有完整的錯誤與中斷語意。

Fiber 的程式碼實作

這篇我想用一個新的方式,就是直接給一個情境,然後再一步一步根據程式碼講解 fiber 的機制。

情境:

假設在應用程式中同時發出兩個 API 請求時,此時我們要先把 working 信號打開。父層先做完一些簡單、很快就能完成的事,然後再等這兩個請求都回來。等兩個都完成後,把 working 信號關掉,最後把兩邊的資料合併起來。
事件時間線

  1. 父流程先輸出「Parent doing other work」
  2. 任務 A、B 幾乎同時開始,背景每 200ms 輸出「working...」
  3. 父流程 500ms 後宣布「now waiting for results」
  4. 先等到 A 完成,再等到 B 完成
  5. 中止 working 信號,輸出加總結果

程式碼:

我們先建立一個 task 函式,用來模擬 API 請求。

import { Console, Effect, Fiber } from "effect"

function makeTask(name: string, delayMs: number, value: number) {
  return Effect.gen(function*() {
    yield* Console.log(`[${name}] start`)
    yield* Effect.sleep(`${delayMs} millis`)
    yield* Console.log(`[${name}] done`)
    return value
  })
}

創建主流程:

const program = Effect.gen(function*() {
  // 1) fork:啟動子任務,立即回傳 Fiber 控制柄
  const fiberA = yield* Effect.fork(makeTask("A", 800, 1))
  const fiberB = yield* Effect.fork(makeTask("B", 1200, 2))

  // 背景不斷提供 working 信號,幫助觀察
  const spinnerFiber = yield* Effect.fork(
    Effect.forever(
      Effect.sleep("200 millis").pipe(Effect.tap(() => Console.log("working...")))
    ).pipe(Effect.ensuring(Console.log("spinner cleanup")))
  )

  // 2) 在 join 之前,父流程可自由插入其他邏輯
  yield* Console.log("Parent doing other work")
  yield* Effect.sleep("500 millis")
  yield* Console.log("Parent done, now waiting for results")

  // 3) Fiber.join:阻塞等待成功值(錯誤會以失敗重新拋出)
  const a = yield* Fiber.join(fiberA)
  const exitA = yield* Fiber.await(fiberA)
  yield* Console.log(exitA)
  const b = yield* Fiber.join(fiberB)
  const exitB = yield* Fiber.await(fiberB)
  yield* Console.log(exitB)

  // 4) 取得控制柄即可取消背景 spinner
  yield* Fiber.interrupt(spinnerFiber)

  const sum = a + b
  yield* Console.log(`sum: ${sum}`)

  return sum
})

Effect.runFork(program)
// 輸出:
// Parent doing other work
// [A] start
// [B] start
// working...
// working...
// Parent done, now waiting for results
// working...
// [A] done
// { _id: 'Exit', _tag: 'Success', value: 1 }
// working...
// working...
// [B] done
// { _id: 'Exit', _tag: 'Success', value: 2 }
// spinner cleanup
// sum: 3

程式碼說明

  • 啟動子任務:Effect.fork(...) 會立刻回傳 Fiber 控制柄,代表子任務已在背景開始跑,但父流程不被卡住。
  • 背景 working 訊號:用 Effect.forever + Effect.sleep + Console.log 做出每 200ms 的提示,再用 fork 啟動,稍後能以 Fiber.interrupt 正常中止。
  • 父流程先做別的事:以 Console.logEffect.sleep("500 millis") 模擬「先處理一些很快的工作」。
  • 等待結果:Fiber.join(fiberA/B) 會等待成功值;若子任務失敗,join 會讓父流程以失敗結束(把錯誤重新拋出)。
  • 觀察結局:示範 Fiber.await(fiber) 取得 Exit(成功或失敗的結局);此處以 Console.log(exit) 簡單觀察。
  • 關掉背景訊號:取得 spinnerFiber 後用 Fiber.interrupt(spinnerFiber) 中止。這是「協作式」中止,會在下一個可中斷邊界(如 sleep、I/O)生效,並執行清理。
  • 合併輸出:拿到 ab 後相加為 sum,最後輸出結果。

為什麼要這樣設計?

  • 需要「同時」跑多件事,但又希望父流程能先繼續做其他邏輯,等需要時再等待結果。
  • 需要「可中止」的背景任務(像 loading spinner),在任務完成時能確實關掉,避免留下孤兒程序。
  • 保留清楚的錯誤語意:透過 join,子任務錯誤不會被吞掉,而是回到父流程正常處理。

深入:interrupt 的 back-pressure 與非阻塞中斷

interrupt 不是「強制立即停止(force kill)」;它是送出中斷訊號,會在下一個可中斷邊界(如 sleep、I/O)生效,並執行清理(若你有以 ensuringacquireRelease 註冊的收尾邏輯)。

預設情況下,Fiber.interrupt(fiber) 會先送出中斷,並「等待該 fiber 完整終止(含所有清理)」後,才讓此呼叫所回傳的 Effect 結束。這是一種 back-pressure 設計:在舊工作確實落幕之前,呼叫端不會繼續往下推進,有助於避免資源外洩並維持事件順序。

  • 預設(等待目標終止):確保清理完成再往下走;你會看到清理日誌(如果有的話)先於下一步驟出現。
// 阻塞等待中斷完成:會等到目標 fiber 終止(含 ensuring 清理)
yield* Fiber.interrupt(spinnerFiber)
// 此行之後才會繼續,例如:
yield* Console.log("spinner 已關閉,繼續後續流程…")
  • 非阻塞(不等待):若你不需要等待,可將「中斷本身」丟到另一條 fiber 執行,主流程立刻往下。
// 將中斷動作 fork 出去,主流程不等待中斷完成
yield* Effect.fork(Fiber.interrupt(spinnerFiber))
// 立刻繼續下一步,清理訊息可能稍後才出現
yield* Console.log("不中斷等待,先繼續後續流程…")

兩種作法的取捨:

  • 需要「嚴格順序/資源安全」:選擇預設等待,確保收尾邏輯都跑完再繼續。
  • 需要「提升吞吐量/不阻塞主幹」:fork 中斷,讓主流程先行;但請注意清理仍在背景進行。
  • 若稍後仍想觀察被中斷那條 fiber 的收尾狀況,可保留原目標 fiber(如 spinnerFiber)的控制柄,之後再以 Fiber.await(spinnerFiber) 檢視其 Exit
// 不中斷等待,稍後再觀察 spinner 結局
const interrupter = yield* Effect.fork(Fiber.interrupt(spinnerFiber))
yield* Console.log("不中斷等待,先繼續後續流程…")
// ... 一段時間後 ...
const spinnerExit = yield* Fiber.await(spinnerFiber)
yield* Console.log(spinnerExit)

補充:何時需要中斷(interrupt)其他 fibers?

  • 父子任務不再需要結果:父 fiber 啟動了多個子任務,後來判斷不再需要其中部分或全部結果時,應中斷那些子 fibers,釋放資源並停止無謂工作。
  • 競速(race)場景:多個 fibers 互相競速,先完成者即為「勝者」;其餘 fibers 應立即中斷,避免做多餘的計算或 I/O。
  • 互動式取消:在互動應用中,使用者可能按「停止/取消」來終止正在進行的任務(例如停止下載、取消批次處理);此時應中斷對應的 fibers。
  • 超時保護:計算或 I/O 超過預期時間,應透過 Effect.timeout 或手動中斷避免佔用資源過久,維持系統回應性。
  • 以輸入驅動的重計算:當使用者輸入變更(如搜尋字串、參數)時,應取消前一個尚在運行的昂貴計算,並以最新輸入重新啟動任務。

組合 Fibers

在實務上,我們常需要把多個正在執行的 fibers 合併成一個來等待或傳遞結果。最常見的有三種:Fiber.zipFiber.zipWithFiber.orElse

什麼時候用哪一個?

  • 需要兩個結果一起返回:用 Fiber.zipFiber.zipWith
  • 需要在失敗時才切換到備援:用 Fiber.orElse

失敗與中斷語意(重要)

  • zip / zipWith:任一子 fiber 失敗或被中斷,合成後的 fiber 也會失敗/中斷。
  • orElse:只有當第一個 fiber「失敗」時才會改用第二個;它不是 race,不會因為第二個比較快就採用它。

範例:用 Fiber.zip 並行組合兩個結果

const program = Effect.gen(function*() {
  const fiber1 = yield* Effect.fork(Effect.succeed("報表已產生"))
  const fiber2 = yield* Effect.fork(Effect.succeed("通知已送出"))

  const fiber = Fiber.zip(fiber1, fiber2)

  const tuple = yield* Fiber.join(fiber)
  console.log(tuple)
})

Effect.runFork(program)
// 輸出:
// [ '報表已產生', '通知已送出' ]

範例:用 Fiber.zipWith 自訂合併邏輯

const program = Effect.gen(function*() {
  const fiber1 = yield* Effect.fork(Effect.succeed("咖啡已煮好"))
  const fiber2 = yield* Effect.fork(Effect.succeed("可頌已出爐"))

  // 使用 zipWith 將兩個字串自訂合併為一句話
  const fiber = Fiber.zipWith(fiber1, fiber2, (a, b) => `${a},接著是${b},早餐準備完成 🎉`)
  const message = yield* Fiber.join(fiber)
  console.log(message)
})

Effect.runFork(program)
// 輸出:
// 咖啡已煮好,接著是可頌已出爐,早餐準備完成 🎉

範例:用 Fiber.orElse 提供失敗備援

const program = Effect.gen(function*() {
  // 啟動一個會失敗的 Fiber
  const fiber1 = yield* Effect.fork(Effect.fail("主要任務失敗:網路逾時"))
  // 再啟動另一個會成功的 Fiber
  const fiber2 = yield* Effect.fork(Effect.succeed("使用備援方案成功:回傳快取結果"))

  // 如果 fiber1 失敗,就改用 fiber2 的結果
  const fiber = Fiber.orElse(fiber1, fiber2)
  const message = yield* Fiber.join(fiber)
  console.log(message)
})

Effect.runFork(program)
// 輸出:
// 使用備援方案成功:回傳快取結果

小結

  • zip vs zipWith:保留兩個原始結果用 zip;需要立即計算單一結果用 zipWith
  • 備援語意:只有第一個失敗才切換 orElse
  • 錯誤/中斷傳播zip/zipWith 任一失敗/中斷 → 合成 fiber 也失敗/中斷;orElse 只在第一個失敗時才生效。

總結

本文以 Fiber 為核心,描述 Effect 如何在單執行緒環境透過 event loop、非阻塞 I/O 與協作式排程達成可預期的並行。父流程可先 fork 子任務持續前進,於需要時以 join/await 取得結果;亦能以 interrupt 以協作方式終止並保證清理(具 back-pressure 設計)。文中示範以可中止的背景 spinner 配合等待策略提升 API API吞吐量,並以 zip/zipWith 匯聚多任務結果,或在失敗時以 orElse 提供備援。不過其實我還沒講完🥲,但今天快過完了...在官網還有一個重要的篇幅在講 Child Fibers 的生命週期,就交給讀者有興趣自己看吧~

參考資料


上一篇
[學習 Effect Day28] Effect 資源管理(二)
系列文
用 Effect 實現產品級軟體29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言