延續 Day29,我們已經了解 Fiber 是如何在單執行緒上提供「可管理的併發」運算。今天我們要講解用 Effect 的高階「組合子」來實現併發效果。除此之外,我們也會深入講一下,處理併發時的最佳實踐。
在多數資料導向或批次 I/O 的情境,這些組合子可讀性高、語意清楚,能自動處理錯誤與中斷傳播,讓我們在保持結構化的同時,提升整體資料吞吐量與系統穩定性。
不過,直接操控 Fiber 仍有其不可取代的價值:當你需要長時間的背景常駐(例如 spinner、polling、watcher)、要明確掌控生命週期與資源(取得 Fiber
後可精準 interrupt
、join
,甚至把 interrupt
再 fork
成非阻塞)、或希望與呼叫端解耦並自訂監督策略與細緻日誌時,fork
會提供最大的彈性。所以實務經驗法則是:資料導向的併發流程,先用組合子與適當的 concurrency
設定來完成;而需要手動控制與長時間背景工作的情境,才直接使用 fork
。
Effect 中許多「同時處理多個 Effect」的 API(例如 Effect.all
、Effect.forEach
相關操作)都支援 concurrency
選項,用來控制同時進行的工作數量。型別如下:
type Concurrency = number | "unbounded" | "inherit"
Effect 如果不提供 concurrency
,預設行為會是「逐一執行」,它提供最穩定、可預期的行為。跟之前一樣,我希望用更貼近現實的例子來說明併發。所以我們假設有一個線上訂單處理情境,訂單流程如下:
驗證訂單 -> 保留庫存 -> 付款請款
我們先來建立一些 helper function,用來模擬訂單處理流程。
import { Console, Effect } from "effect"
function withTiming<A, E, R>(effect: Effect.Effect<A, E, R>) {
return Effect.gen(function*() {
const startedAt = Date.now()
const value = yield* effect
const elapsedMs = Date.now() - startedAt
yield* Console.log(`耗時 ${elapsedMs} ms`)
return value
})
}
function validateOrder(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:驗證中`)
yield* Effect.sleep("50 millis")
yield* Console.log(`訂單 #${orderId}:驗證完成`)
return orderId
})
}
function reserveInventory(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:保留庫存`)
yield* Effect.sleep("100 millis")
yield* Console.log(`訂單 #${orderId}:庫存已保留`)
return orderId
})
}
function chargePayment(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:付款請款`)
yield* Effect.sleep("150 millis")
yield* Console.log(`訂單 #${orderId}:付款成功`)
return orderId
})
}
function createShipment(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:建立出貨單`)
yield* Effect.sleep("120 millis")
yield* Console.log(`訂單 #${orderId}:出貨單建立完成`)
return orderId
})
}
function sendNotifications(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:寄送通知(Email / SMS)`)
yield* Effect.sleep("80 millis")
yield* Console.log(`訂單 #${orderId}:通知完成`)
return orderId
})
}
function processOrder(orderId: number) {
return Effect.gen(function*() {
const validatedId = yield* validateOrder(orderId)
const reservedId = yield* reserveInventory(validatedId)
const chargedId = yield* chargePayment(reservedId)
return chargedId
})
}
上述我們先準備了幾個 helper function,全部都回傳 Effect
,並用 Effect.sleep
模擬實際 I/O 等待時間,方便觀察不同併發策略的行為:
Effect
量測總執行時間,幫助比較不同策略的耗時。實務上,同時可能會有多個訂單發生,在沒有併發的情況下,訂單會一個一個處理。如下方程式碼所示:
const program = Effect.all([
processOrder(101),
processOrder(102),
processOrder(103)
])
Effect.runFork(withTiming(program))
// 輸出:
// 訂單 #101:驗證中
// 訂單 #101:驗證完成
// 訂單 #101:保留庫存
// 訂單 #101:庫存已保留
// 訂單 #101:付款請款
// 訂單 #101:付款成功
// 訂單 #102:驗證中
// 訂單 #102:驗證完成
// 訂單 #102:保留庫存
// 訂單 #102:庫存已保留
// 訂單 #102:付款請款
// 訂單 #102:付款成功
// 訂單 #103:驗證中
// 訂單 #103:驗證完成
// 訂單 #103:保留庫存
// 訂單 #103:庫存已保留
// 訂單 #103:付款請款
// 訂單 #103:付款成功
// 耗時 922 ms
我們可以從上面輸出結果看出,三個任務是逐一執行的。但如果任務是獨立的,其實我們就可以考慮使用「限制數量的併發」。透過設定固定併發數在吞吐量與系統穩定之間取得平衡。
提供一個數字 concurrency: 2
,代表同時間最多跑 2 個。這是最常見的「節流」用法,避免打爆下游服務,或控管本機資源(檔案 I/O、網路連線)。
const program = Effect.all(
[
processOrder(201),
processOrder(202),
processOrder(203)
],
{ concurrency: 2 }
)
Effect.runFork(withTiming(program))
// 輸出:
// 訂單 #201:驗證中
// 訂單 #202:驗證中
// 訂單 #201:驗證完成
// 訂單 #201:保留庫存
// 訂單 #202:驗證完成
// 訂單 #202:保留庫存
// 訂單 #201:庫存已保留
// 訂單 #201:付款請款
// 訂單 #202:庫存已保留
// 訂單 #202:付款請款
// 訂單 #201:付款成功
// 訂單 #203:驗證中
// 訂單 #202:付款成功
// 訂單 #203:驗證完成
// 訂單 #203:保留庫存
// 訂單 #203:庫存已保留
// 訂單 #203:付款請款
// 訂單 #203:付款成功
// 耗時 622 ms
這裡有一個小細節,就是為何輸出是按照下面的順序
訂單 #201:驗證完成
訂單 #201:保留庫存
訂單 #202:驗證完成
訂單 #202:保留庫存
不是下面的順序呢?
訂單 #201:驗證完成
訂單 #202:驗證完成
訂單 #201:保留庫存
訂單 #202:保留庫存
我們不是併發嗎?是的,不過讓我來從頭一步一步說明😀
關鍵在於「每一筆訂單內部仍是依序」,我們的設計是「不同訂單之間的併發」。Effect.all([...], { concurrency: 2 })
的語意是:同一時間最多啟動 2 個 processOrder
任務(兩條 fiber)。當其中一條 fiber 完成某個步驟(例如 #201 完成驗證)後,該條 fiber 會立刻繼續進到下一個步驟(保留庫存),而不會等另一條 fiber 也完成相同步驟才一起往下。
在 Effect runtime 中,fiber 會在遇到非同步邊界(例如 Effect.sleep
、I/O)時「掛起」(suspend),等對應的事件完成(計時器觸發、Promise resolve)時被排入佇列並「喚醒」繼續執行。因此,不同步驟的等待時間不同(驗證 50ms、保留 100ms、請款 150ms),兩條 fiber 被喚醒的時間點自然錯開,輸出也就交錯。
至於「什麼時候安插新的 fiber」:在 concurrency: 2
的場合,系統一開始會啟動兩條頂層任務(#201、#202)。只有當其中一條頂層任務「整體完成」時(例如 #201 已經執行到「付款成功」),才會把待處理的下一筆(#203)fork 成第三條新 fiber 進場。換言之,不會在某一筆任務的中途就補上新的第三條;你也能從日誌看到 #203「驗證中」正是在某一筆完成後才出現。
但要注意的是「同一筆訂單內的事件順序永遠不會逆序」,只是在多筆交錯下呈現你看到的時間線。這也正是限制併發數的價值:在保持單筆流程清晰的同時,讓多筆訂單併發推進、提升整體吞吐量。
根據輸出結果,我們可以發現同時只會有兩個任務在執行,一但有任務完成,才會再啟動一個新任務。這種節流方式最大的優勢就是在資源可控的情形下加速整體訂單處理速度。整體處理流程從 922ms 縮短到 623ms。
不過當資源允許或需求可控時,我們就可以考慮「不設上限」讓任務同時起跑,但這種作法請務必確認外部服務與系統負載承受得住。
{ concurrency: "unbounded" }
表示不設併發上限。I/O-bound、且可接受大量同時進行時可用,但務必確認外部服務或系統資源不會被壓垮。
const program = Effect.all(
[
processOrder(301),
processOrder(302),
processOrder(303)
],
{ concurrency: "unbounded" }
)
Effect.runFork(withTiming(program))
// 輸出:
// 訂單 #301:驗證中
// 訂單 #302:驗證中
// 訂單 #303:驗證中
// 訂單 #301:驗證完成
// 訂單 #301:保留庫存
// 訂單 #302:驗證完成
// 訂單 #302:保留庫存
// 訂單 #303:驗證完成
// 訂單 #303:保留庫存
// 訂單 #301:庫存已保留
// 訂單 #301:付款請款
// 訂單 #302:庫存已保留
// 訂單 #302:付款請款
// 訂單 #303:庫存已保留
// 訂單 #303:付款請款
// 訂單 #301:付款成功
// 訂單 #302:付款成功
// 訂單 #303:付款成功
// 耗時 311 ms
因為不設上限,三筆訂單會同時起跑;每筆仍維持「單筆內部順序保證、跨筆可交錯」。總耗時大約等於單筆臨界路徑的總和(50 + 100 + 150 ≈ 300ms)再加上些許排程與日誌開銷,所以看到約 311ms。這證明 unbounded
能讓可併發的 I/O 盡可能同時進行,但務必確認外部系統(資料庫、第三方 API、郵件簡訊服務等)承受得住瞬間高併發。
當流程有巢狀組合時(像在已經設定過併發的流程中再呼叫其他 API),你可以指定 { concurrency: "inherit" }
來沿用外層既有的併發策略,避免在不同層級各自設定、產生意料外的爆量發生。聽起來有點抽象對吧!?😅 我們用實際的例子解釋一下:你可以想像我們同時要處理很多付款資訊,且付款資訊確認後,需要建立出貨單與寄送通知。但我們每次最多只想同時處理 2 個運算流程。
// 階段 A:付款(限流 2)→ 階段 B:出貨與通知(沿用限流 2)
function processBatchPhased() {
return Effect.gen(function*() {
// 階段 A:先對第一批訂單進行「驗證 / 保留庫存 / 請款」
const paid = yield* Effect.all(
[401, 402, 403, 404, 405].map((id) =>
Effect.gen(function*() {
yield* validateOrder(id)
yield* reserveInventory(id)
yield* chargePayment(id)
return id
})
),
{ concurrency: 2 }
)
// 階段 B:對已付款清單建立出貨與通知,沿用上一層的併發設定
const shippedAndNotified = yield* Effect.all(
paid.map((id) =>
Effect.gen(function*() {
yield* createShipment(id)
yield* sendNotifications(id)
return id
})
),
{ concurrency: "inherit" }
)
return shippedAndNotified
})
}
Effect.runFork(withTiming(processBatchPhased()))
// 輸出:
// 訂單 #401:驗證中
// 訂單 #402:驗證中
// 訂單 #401:驗證完成
// 訂單 #401:保留庫存
// 訂單 #402:驗證完成
// 訂單 #402:保留庫存
// 訂單 #401:庫存已保留
// 訂單 #401:付款請款
// 訂單 #402:庫存已保留
// 訂單 #402:付款請款
// 訂單 #401:付款成功
// 訂單 #403:驗證中
// 訂單 #402:付款成功
// 訂單 #404:驗證中
// 訂單 #403:驗證完成
// 訂單 #403:保留庫存
// 訂單 #404:驗證完成
// 訂單 #404:保留庫存
// 訂單 #403:庫存已保留
// 訂單 #403:付款請款
// 訂單 #404:庫存已保留
// 訂單 #404:付款請款
// 訂單 #403:付款成功
// 訂單 #405:驗證中
// 訂單 #404:付款成功
// 訂單 #405:驗證完成
// 訂單 #405:保留庫存
// 訂單 #405:庫存已保留
// 訂單 #405:付款請款
// 訂單 #405:付款成功
// 訂單 #401:建立出貨單
// 訂單 #402:建立出貨單
// 訂單 #403:建立出貨單
// 訂單 #404:建立出貨單
// 訂單 #405:建立出貨單
// 訂單 #401:出貨單建立完成
// 訂單 #401:寄送通知(Email / SMS)
// 訂單 #402:出貨單建立完成
// 訂單 #402:寄送通知(Email / SMS)
// 訂單 #403:出貨單建立完成
// 訂單 #403:寄送通知(Email / SMS)
// 訂單 #404:出貨單建立完成
// 訂單 #404:寄送通知(Email / SMS)
// 訂單 #405:出貨單建立完成
// 訂單 #405:寄送通知(Email / SMS)
// 訂單 #401:通知完成
// 訂單 #402:通知完成
// 訂單 #403:通知完成
// 訂單 #404:通知完成
// 訂單 #405:通知完成
// 耗時 1133 ms
這個範例分成兩個階段,並示範如何用 inherit
在巢狀流程中沿用外層併發策略:
concurrency: 2
):同時間只處理兩筆訂單。從輸出可觀察到,始終維持兩筆在推進;當其中一筆完成請款,下一筆才開始驗證,直到 401~405 全數完成。concurrency: "inherit"
):等 A 階段全部完成後,對成功清單建立出貨並寄送通知。因為使用 inherit
,內層會沿用外層既有的併發策略,仍然維持同時間最多 2 個流程在執行。實務上這有兩個好處:
中斷是 Effect 併發的靈魂:它不是「強制殺掉」,而是傳遞一個可協作的訊號,會在下一個可中斷邊界(如 sleep
、I/O)生效,並執行清理。
function processSingleOrder(orderId: number) {
return Effect.gen(function*() {
yield* Console.log(`訂單 #${orderId}:開始處理`)
// 先執行基本處理(驗證 / 庫存 / 請款)
const paidId = yield* processOrder(orderId)
// 模擬風險控管:若 id 命中規則,主動中斷該工作
if (paidId === 502) {
yield* Console.log(`訂單 #${paidId}:風險控管觸發,開始中斷`)
return yield* Effect.interrupt
}
// 通常情況:建立出貨單並寄送通知
yield* createShipment(paidId)
yield* sendNotifications(paidId)
yield* Console.log(`訂單 #${paidId}:完成`)
return paidId
}).pipe(
Effect.onInterrupt(() => Console.log(`訂單 #${orderId}:已中斷(釋放/回滾資源)`))
)
}
const batchProgram = Effect.forEach([501, 502, 503], (id) => processSingleOrder(id), {
concurrency: "unbounded"
})
Effect.runPromiseExit(batchProgram).then((exit) => console.log(JSON.stringify(exit, null, 2)))
// 輸出:
// 訂單 #501:開始處理
// 訂單 #501:驗證中
// 訂單 #502:開始處理
// 訂單 #502:驗證中
// 訂單 #503:開始處理
// 訂單 #503:驗證中
// 訂單 #501:驗證完成
// 訂單 #501:保留庫存
// 訂單 #502:驗證完成
// 訂單 #502:保留庫存
// 訂單 #503:驗證完成
// 訂單 #503:保留庫存
// 訂單 #501:庫存已保留
// 訂單 #501:付款請款
// 訂單 #502:庫存已保留
// 訂單 #502:付款請款
// 訂單 #503:庫存已保留
// 訂單 #503:付款請款
// 訂單 #501:付款成功
// 訂單 #501:建立出貨單
// 訂單 #502:付款成功
// 訂單 #502:風險控管觸發,開始中斷
// 訂單 #502:已中斷(釋放/回滾資源)
// 訂單 #501:已中斷(釋放/回滾資源)
// 訂單 #503:已中斷(釋放/回滾資源)
// {
// "_id": "Exit",
// "_tag": "Failure",
// "cause": {
// "_id": "Cause",
// "_tag": "Parallel",
// "left": {
// "_id": "Cause",
// "_tag": "Parallel",
// "left": {
// "_id": "Cause",
// "_tag": "Sequential",
// "left": {
// "_id": "Cause",
// "_tag": "Empty"
// },
// "right": {
// "_id": "Cause",
// "_tag": "Interrupt",
// "fiberId": {
// "_id": "FiberId",
// "_tag": "Runtime",
// "id": 0,
// "startTimeMillis": 1760452777968
// }
// }
// },
// "right": {
// "_id": "Cause",
// "_tag": "Interrupt",
// "fiberId": {
// "_id": "FiberId",
// "_tag": "Runtime",
// "id": 3,
// "startTimeMillis": 1760452777970
// }
// }
// },
// "right": {
// "_id": "Cause",
// "_tag": "Sequential",
// "left": {
// "_id": "Cause",
// "_tag": "Empty"
// },
// "right": {
// "_id": "Cause",
// "_tag": "Interrupt",
// "fiberId": {
// "_id": "FiberId",
// "_tag": "Runtime",
// "id": 0,
// "startTimeMillis": 1760452777968
// }
// }
// }
// }
// }
以上程式展示了三件事:
Effect.interrupt
,它不會「立刻強殺」,而是等到下一個可中斷點生效,並觸發對應的清理邏輯(透過 onInterrupt
)。Exit
中的 Cause
會清楚描述平行/序列結構,方便除錯與監控。父流程被中斷時,子流程(包含用 Effect.all
建立的多個子任務)也會一併被中斷,並按語意執行清理。
const child = Effect.sleep("1 second").pipe(
Effect.onInterrupt(() => Console.log("child cleanup"))
)
const parent = Effect.gen(function*() {
const fiber = yield* Effect.fork(
Effect.all([child, child, child], { concurrency: "unbounded" })
)
yield* Effect.sleep("150 millis")
yield* Fiber.interrupt(fiber) // 中斷父 → 子任務也被中斷
})
Effect.runFork(parent)
// 輸出:
// child cleanup
// child cleanup
// child cleanup
競速就是「先完成者決定局面,其餘中斷」。Effect 提供多種競速種類供我們使用:race
/ raceAll
/ raceFirst
/ raceWith
。
// 競速示範:較快者勝出、較慢者被中斷
const fasterCandidate = Effect.succeed("較快任務").pipe(
Effect.delay("100 millis"),
Effect.tap(Console.log("較快任務:完成")),
Effect.onInterrupt(() => Console.log("較快任務:被中斷"))
)
const slowerCandidate = Effect.succeed("較慢任務").pipe(
Effect.delay("300 millis"),
Effect.tap(Console.log("較慢任務:完成")),
Effect.onInterrupt(() => Console.log("較慢任務:被中斷"))
)
// 在競速下,勝者的值會被回傳;落敗者會被中斷並觸發 onInterrupt
Effect.runFork(
Effect.race(fasterCandidate, slowerCandidate).pipe(
Effect.tap(Console.log)
)
)
// 輸出:
// 較快任務:完成
// 較慢任務:被中斷
// 較快任務
先完成者會決定結果,另一個在下一個可中斷點中止並執行清理(若有 onInterrupt
/ ensuring
)。
當你要讓多個任務同時競速,可以使用對應的多參數版本。通常語意是「誰先完成就採用誰」,其餘被中斷;實際細節請以官方 API 文件為準,這裡示範常見寫法:
// 競速(raceAll):誰先完成且成功就採用,其餘自動中斷
const task1 = Effect.fail("任務1").pipe(
Effect.delay("100 millis"),
Effect.tap(Console.log("任務1 完成")),
Effect.onInterrupt(() => Console.log("任務1 已中斷"))
)
const task2 = Effect.succeed("任務2").pipe(
Effect.delay("200 millis"),
Effect.tap(Console.log("任務2 完成")),
Effect.onInterrupt(() => Console.log("任務2 已中斷"))
)
const task3 = Effect.succeed("任務3").pipe(
Effect.delay("150 millis"),
Effect.tap(Console.log("任務3 完成")),
Effect.onInterrupt(() => Console.log("任務3 已中斷"))
)
const program = Effect.raceAll([task1, task2, task3])
Effect.runPromiseExit(program).then(console.log)
// 輸出:
// 任務3 完成
// 任務2 已中斷
// { _id: 'Exit', _tag: 'Success', value: '任務3' }
這段示範了三個任務的競速:
task1
會失敗、task2
與 task3
會成功但延遲不同。由於 raceAll
採用「誰先成功就採用誰」的語意(常見用法),最終會回傳最先完成且成功的 task3
,其餘尚未完成的任務會被中斷,並觸發各自的 onInterrupt
。raceAll
很適合多個快取層或多區域副本,先回來先用,落後者中斷以節省資源。// 競速(raceFirst):只要有一個完成就採用,不管成功與否,其餘自動中斷
const task1 = Effect.fail("任務1").pipe(
Effect.delay("100 millis"),
Effect.tap(Console.log("任務1 完成")),
Effect.onInterrupt(() => Console.log("任務1 已中斷").pipe(Effect.delay("100 millis")))
)
const task2 = Effect.succeed("任務2").pipe(
Effect.delay("200 millis"),
Effect.tap(Console.log("任務2 完成")),
Effect.onInterrupt(() => Console.log("任務2 已中斷").pipe(Effect.delay("100 millis")))
)
const program = Effect.raceFirst(task1, task2).pipe(
Effect.tap(Console.log("更多後續工作..."))
)
Effect.runPromiseExit(program).then(console.log)
// 輸出:
// 任務2 已中斷
// {
// _id: 'Exit',
// _tag: 'Failure',
// cause: { _id: 'Cause', _tag: 'Fail', failure: '任務1' }
// }
raceFirst
的語意是「第一個完成(成功或失敗都算)就採用,其他候選中斷」。它與 raceAll
的差別在於:raceAll
偏向「第一個成功者」勝出,而 raceFirst
連失敗也會被立即採用,適合把「最早回來」視為訊號,再由後續邏輯自行判斷與補救(例如風險探測、快速回應 UX)。若你只想採用第一個成功結果,請改用 raceAll
;需要更細緻的收斂策略(例如一邊成功就中斷對方、或想保留對方結果做比對)則建議使用 raceWith
進行客製化。
當我思考 Effect 的併發,我會把它想成多條「純序的流程」交錯前進:每條流程在非同步邊界會被暫停(suspend)、事件完成後被喚醒;併發說白了只是允許同時有幾條運算可以存在。有了這個核心概念,併發設計就會變得直覺且可推理。
最後想提醒一下,如果想要更精確的控制競速流程,可以參考官方文件 Effect.disconnect 與 Effect.raceWith 的章節喔~因為時間不夠,所以就留給讀者自行閱讀了🥹