今天會接續昨天的 RxJS 介紹,繼續介紹 flatten operator,以及如何自己打造簡易 Observable,最後以 FP 視角再次檢視 Observable~
在實務中可能會遇到一種情況:一個 Observable
發出的值本身又是另一個 Observable
。這種「Observable
的 Observable
」被稱為高階 Observable
(Higher-order Observable)。例如,當一個點擊事件串流 (Observable<ClickEvent>
) 中的每次點擊都觸發一次 API 請求(回傳 Observable<Response>
)時,我們就會得到一個高階 Observable。
直接訂閱高階 Observable 會得到 Observable 物件本身,而不是我們想要的最終資料。這時就需要使用 Flattening Operators 來「解開」這層嵌套,將內部 Observable 的值攤平到外部的串流中。
而攤平的方式可分幾種,分別為 concatMap
、mergeMap
和 switchMap
,為了說明它們的區別,我們假設一個情境:使用者在搜尋框中輸入文字,每次輸入都觸發一次 API 查詢。
mergeMap
圖 1 mergeMap
示意圖(資料來源: https://rxmarbles.com/#mergeMap)
"abc"
,它會同時發出三個 API 請求,並將結果隨機合併。(不會確保 request 對應到的 response 順序)concatMap
圖 2 concatMap
示意圖(資料來源: https://rxmarbles.com/#concatMap)
concatMap
也會為其建立一個內部 Observable,但它會等待前一個內部 Observable 完成後,才會訂閱並執行下一個。它保證了處理和輸出的順序與觸發順序一致。POST
請求新增資料,成功後再執行 GET 請求來獲取更新後的列表。在搜尋框情境下,如果使用者快速輸入 "abc"
,它會先完成對 "a"
的請求,再發起對 "b"
的請求,以此類推。switchMap
圖 3 switchMap
示意圖(資料來源: https://rxmarbles.com/#switchMap)
"abc"
,當 "b"
的輸入事件到來時,switchMap
會取消對 "a"
的 API 請求;當 "c"
的輸入事件到來時,它會取消對 "b"
的請求。最終只有對 "c"
的請求會被完成並回傳結果。可避免了不必要的網路請求和處理過期的資料。程式碼範例如下,有興趣的可以到此連結試試看。
import { fromEvent, of } from "rxjs";
import { map, delay, mergeMap, concatMap, switchMap } from "rxjs/operators";
// 模擬 API 請求(回傳一個 Observable,延遲一段時間後才發出結果)
function fakeApi(query) {
const delayTime = Math.floor(Math.random() * 2000) + 500; // 隨機 0.5 ~ 2.5 秒
return of(`結果: ${query}`).pipe(delay(delayTime));
}
// 取得輸入框
const input = document.getElementById("search");
// 建立輸入事件流
const input$ = fromEvent(input, "input").pipe(map((e) => e.target.value));
// 1. mergeMap: 併發多個請求,誰先回來誰先輸出
input$
.pipe(mergeMap((query) => fakeApi(query)))
.subscribe((result) => console.log("mergeMap:", result));
// 2. concatMap: 等待前一個完成,再執行下一個,順序與輸入順序一致
input$
.pipe(concatMap((query) => fakeApi(query)))
.subscribe((result) => console.log("concatMap:", result));
// 3. switchMap: 永遠只保留最後一次輸入的請求,舊的會被取消
input$
.pipe(switchMap((query) => fakeApi(query)))
.subscribe((result) => console.log("switchMap:", result));
了解 Observable 簡單的使用方式後,來試著自己實做看看簡單版的 Observable 吧!
我們的目標是建立一個 MyObservable
類別,並為它加上 of
和 pipe
方法,它將模仿 rxjs/Observable
的核心功能。
MyObservable
MyObservable
的設計規劃如下:
constructor
: 建構子會接收一個函式作為唯一的參數,可稱之為生產者函式 (_subscribe
)。這個函式就是我們之前提到的藍圖,它知道如何產生值,並會在被訂閱時實際執行。subscribe
method: 這是 MyObservable
實例對外暴露的公開方法。它接收一個 observer
物件。當 subscribe
被呼叫時,它會執行儲存在建構子中的 _subscribe
函式,並將傳入的 observer
物件交給它。這是觸發資料流動的開關。接著來實作看看這 class:
class MyObservable {
// 建構子接收一個函式,我們將其儲存在實例的私有屬性 _subscribe 中。
// 這個函式是產生資料的核心邏輯。
constructor(subscribeFunc) {
this._subscribe = subscribeFunc;
}
// subscribe 是啟動資料流的方法。
// 它接收一個 observer 物件,該物件包含 next, error, complete 方法。
subscribe(observer) {
// 關鍵就在這裡:直接呼叫我們在建構時儲存的 _subscribe 函式,
// 並將 observer 傳遞進去。
// 這就將「定義」和「執行」兩個階段連接起來了。
this._subscribe(observer);
}
}
of
、fromEvent
和 pipe
方法接著為 MyObservable
增加 of
和 pipe
方法。
of
:接收任意數量的參數,並建立一個會依序發出這些參數,最後立即完成的 Observable。pipe
:一個實例方法,它接收任意數量的 operator 函式。它會透過 reduce
將這些運算子函式依序應用到當前的 Observable 上,回傳一個經過所有運算子轉換後的全新 Observable。 class MyObservable {
constructor(subscribeFunc) {
this._subscribe = subscribeFunc;
}
subscribe(observer) {
this._subscribe(observer);
}
// 加上一個靜態的 of 方法
static of(...args) {
// of 方法回傳一個新的 MyObservable 實例。
return new MyObservable(observer => {
// 在這個新的 Observable 的生產者函式中,我們遍歷傳入 of 的所有參數。
for (const arg of args) {
observer.next(arg);
}
// 所有參數都推送完畢後,呼叫 observer.complete() 來結束串流。
observer.complete();
});
}
// 加上 pipe 方法
pipe(...operators) {
// 使用 reduce 將所有運算子串連起來。
// 初始值是 this (當前的 Observable 實例)。
// 每一個 operator 都是一個函式,它接收一個 Observable,回傳一個新的 Observable。
return operators.reduce((source, operator) => operator(source), this);
}
}
map
pipe
方法本身只是個串連工具,要讓它真正有用,我們需要建立可以放進去的 Operator。Operator 是一個高階函式:它接收一些配置參數(例如 map
接收一個轉換函式),然後回傳一個新函式。這個回傳的新函式才是真正作用於資料流的函式,它接收一個來源 Observable 作為輸入,並回傳一個新的 Observable 。
讓我們來實作自己的 map Operator。
// myMap 是一個高階函式
const myMap = (project) => {
// 它回傳一個新函式,這個函式才是真正的 operator
return (sourceObservable) => {
// operator 會回傳一個全新的 Observable
return new MyObservable(observer => {
// 在新的 Observable 內部,我們訂閱來源 Observable
sourceObservable.subscribe({
next: (value) => {
// 當來源發出值時,使用 project 函式進行轉換
const newValue = project(value);
// 然後將轉換後的值推送到下游
observer.next(newValue);
},
error: (err) => observer.error(err), // 直接傳遞錯誤
complete: () => observer.complete(), // 直接傳遞完成訊號
});
});
};
};
現在我們來看看它們如何協同運作:
// 1. 使用 MyObservable.of 建立一個來源串流
const source$ = MyObservable.of(1, 2, 3);
// 2. 定義一個 observer
const myObserver = {
next: value => console.log('Received:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completed!'),
};
// 3. 使用 pipe 和自訂的 myMap 運算子
const mapped$ = source$.pipe(
myMap(x => x * 10),
myMap(x => `The result is ${x}`)
);
// 4. 訂閱最終的 Observable
mapped$.subscribe(myObserver);
執行這段程式碼,會在 console 看到:
Received: The result is 10
Received: The result is 20
Received: The result is 30
Completed!
以上實作可以讓我們觀察到幾點:
myMap
)是一個高階函式,它接收一個 Observable 並返回一個新的 Observable。pipe
透過 reduce
將多個 operators 串連起來,形成一個新的、組合後的 Observable,而不會修改任何中間過程的 Observable。在看 Observable 的設計哲學和 operator 的運作機制時,會覺得有些熟悉,因為 RxJS 在設計上與我們見過的結構十分相似:Functor、Monad 與 Monoid。
接下來就來看看 Observable 與他們的關係~
map
的力量Functor 的意思是一個「可被映射 (mappable) 的容器」。它必須提供一個 map
方法,這個方法接收一個函式,並將該函式應用於容器內的值,最終回傳一個裝有新值的、結構相同的全新容器。
Observable 符合這個定義,因此它屬於一種 Functor。
map
方法: 對應到 RxJS 的 map
operator。Observable<A>
和一個轉換函式 f: A -> B
,然後回傳一個全新的 Observable<B>
。它只轉換了內部的值,而「非同步事件串流」這個容器結構本身(包含其時序、錯誤處理、完成通知等特性)被完整地保留了下來。Observable 作為一個 Functor,搭建了一座橋樑。它告訴我們:「你只需要提供單純的資料轉換邏輯,我會負責在複雜的非同步時間軸上,正確地、安全地應用它。」
Monad 可以看作是 Functor 的延伸版本,它除了 map
之外,還提供了一個關鍵操作(常稱為 flatMap
或 chain
),專門用來處理「容器中的容器」的情況,並將其攤平 (flatten)為單層的容器。
在 RxJS 中,mergeMap
、concatMap
和 switchMap
這些 Flattening Operators 正是 Monad 中 chain 概念的具體實現。它們接收一個函式,該函式回傳一個 Observable,然後它們會自動訂閱這個內部的 Observable,並將其發出的值攤平到外部的串流中。
更深一層來看,Observable 的行為模式與我們提過的 IO Monad 十分相似。IO Monad 的核心思想是:將一個帶有副作用的操作(如讀取檔案、網路請求)封裝成一個純粹的值。這個值本身只是一個對該操作的「描述」或「藍圖」,它本身並不執行任何副作用。副作用只有在這個 IO 值被一個特定的「執行」函式呼叫時才會真正發生。
這與 Observable 的運作方式很相似:
new Observable(subscriber => {... })
這段程式碼本身是純粹的。它建立了一個 Observable 物件,這個物件就是一個「藍圖」,描述了當有人訂閱時應該執行的一系列操作(例如 addEventListener 或 fetch)。.subscribe()
之前,沒有任何事件被註冊監聽事件,沒有任何網路請求被發出。.subscribe()
的呼叫,就相當於 IO Monad 的「執行」函式。它觸發了藍圖中描述的副作用。因此我們可以將 Observable 視為一個偽裝成 IO Monad 的物件。它讓我們能以一種純函數式的方式來定義、組合和傳遞那些不純的、非同步的副作用操作。
在之前的 再探 Monoid 文章中,有介紹 Monoid 是一個包含三要素的代數結構:一個集合、一個滿足結合律的二元操作,以及一個單位元素。另外當時在範例程式中也提到我們可以合併 Stream 這型別,而 Stream 就是類似 RxJS 的概念。
Observable 本身的確構成了一個 Monoid:
T
,所有 Observable<T>
的集合。merge(obsA, obsB)
接收兩個 Observable,並回傳一個新的 Observable,這個新的 Observable 會併發地發出 obsA
和 obsB
的所有值。這個操作滿足結合律:merge(a, merge(b, c))
的行為等同於 merge(merge(a, b), c)
。next
值,而是立即發出 complete
通知。merge(obsA, EMPTY)
的行為與單獨的 obs
A 完全相同,滿足單位元素定義。Observable 構成了 Monoid,代表 Observable 的集合都是 reducible 的。Monoid 的結構確保我們能用可靠的方式,將一組 Observable 合併成單一的一個。
以下簡單示意 Monoid 的 Observable
,如何用 reduce
的方式歸納所有的事件流變成單一事件流:
import { fromEvent, merge, EMPTY } from 'rxjs';
const buttonA = document.getElementById('btnA');
const buttonB = document.getElementById('btnB');
const buttonC = document.getElementById('btnC');
const clickA$ = fromEvent(buttonA, 'click');
const clickB$ = fromEvent(buttonB, 'click');
const clickC$ = fromEvent(buttonC, 'click');
// 把所有來源放到陣列裡
const observables = [clickA$, clickB$, clickC$];
// 因為 Observable 是一個 Monoid,我們可以用 reduce 來合併它們!
const allClicks$ = observables.reduce(
(acc, current) => merge(acc, current), // 二元運算:merge
EMPTY // 單位元素:EMPTY
);
allClicks$.subscribe(() => console.log('A button was clicked!'));
從這裡可看出,Monoid 非常強大,它提供了一種通用的、基於代數理論的模式來聚合多個事件源,而不需要寫任何客製化的、臨時的邏輯。這正是我們學習 Monoid 原因,它讓我們能基於理論來解決真實的程式設計問題。
用幾個要點來回顧一下兩天的文章。
.pipe()
方法串連起來,形成一條宣告式的資料處理管線,讓我們能優雅地處理複雜邏輯,而不需手動管理狀態。map
安全地在非同步的內容中轉換值。switchMap
等 Flattening Operators,優雅地串連相依的非同步操作,避免巢狀地獄。merge
和 EMPTY
,以一種有原則的方式將多個串流合併為一個。