今天要介紹的是我期待已久的 RxJS! RxJS 博大精深,甚至可以用一整個鐵人賽系列文來介紹(在此推推 30 天精通 RxJS和打通 RxJS 任督二脈這兩個系列文)。
希望今天的文章可以簡單帶大家理解 RxJS 的設計哲學,瞭解它如何體現 FP 的精神,並應用在實務開發上。而如果要進一步深入理解語法和更多應用,很推薦大家去讀上面的鐵人賽文章。
在這系列 Functional Programming 的文章中,我們學會了如何運用 map
、filter
、reduce
這些工具來處理陣列,也就是那些存在於「空間」維度上的資料集合。我們透過純函式與函式組合,降低了程式碼的複雜度。
但如果我們的資料是散落在「時間」的維度上呢?想像一下使用者的滑鼠點擊、鍵盤輸入、來自伺服器的非同步回應、甚至是 WebSocket 的即時訊息... 這些事件如同一條時間長河,我們要如何用 FP 的思維來捕捉、組合與操作它們?
這正是 RxJS (Reactive Extensions for JavaScript) 要解決的問題。它不僅僅是一個函式庫,更是一種處理非同步事件的思考模式。它讓我們能將非同步事件視為一種「集合」,一種隨時間流動的「串流 (Stream)」,並讓我們能夠用熟悉的 FP 工具來駕馭它們。
在認識 RxJS 之前,先來看看 ReactiveX,因為 RxJS 是 ReactiveX 的 JavaScript 版本,ReactiveX 才是一切的起源。
ReactiveX 官方是這樣定義自己的:
ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming.
Observer pattern、Iterator 和 functional programming 我剛好都介紹過,Observer pattern 可參考之前鐵人賽的文章[Day 11] Observer 模式,Iterator 則在昨天介紹 Generator Function 的時候稍微介紹了,functional programming 不必多說,這一整個系列文都在說明 FP 的設計精神。
以下簡單複習一下這三個概念,並說明他們的核心觀念如何體現在 RxJS 的設計上。
在之前的 [Day 11] Observer 模式文章中介紹了 Observer 模式,快速複習一下它是什麼:Observer 模式定義了一種一對多的依賴關係,當一個物件(被稱為 Subject 或 Publisher)的狀態發生改變時,所有依賴於它的物件(被稱為 Observers 或 Subscribers)都會得到通知並自動更新。
在 Lazy evaluation 和 Generator function 文章中,我們認識了 Iterator:一個 Iterator 物件提供 next()
方法,讓消費者可以逐步「拉取」序列中的值。例如,一個陣列經過 iterator.next()
就能依序取出元素;而使用 Generator 函式,則能更優雅地建構這種逐步拉取的序列。
iterator.next()
的程式碼)主動向生產者(資料源)請求下一個值。生產者是被動的,只有在被請求時才會提供資料。更具體來說,在「同步世界」裡,Iterator 是拉取資料的標準介面。而在「非同步世界」裡,Observable 則是推送資料的對應機制。
可以用幾個例子來對照理解:
因此我們可以把 Observable 看作是「時間軸上的集合」或「非同步的迭代器」。Iterator 和 Observable 也都具備 FP 程式設計熱愛的惰性 (Lazy) 特質:它們的邏輯只有在被實際消費時才會執行——對 Iterator 而言,是呼叫 .next()
;對 Observable 而言,是呼叫 .subscribe()
。
在「Lazy evaluation 和 Generator function」的文章中,我們介紹了迭代器 (Iterator) 和生成器 (Generator),它們的核心是一種「惰性求值」的資料消費方式。這種資料消費的方式屬於 Pull System (拉取系統),而 RxJS 利用 Observer 模式來設計資料消費方式,這屬於 Push System (推送系統)。以下稍微簡介兩者。
(補充一下,所謂資料消費者就是需要資料的人,而資料生產者就是產出資料的人,應該還算好理解~)
const value = someFunction()
,或是迭代器的 iterator.next()
。消費者主動「拉」回一個值。另外,現在很多前端框架都採用的 Signals 機制,其資料消費方式也屬於拉取模式,需要時才存取。用比較生活化的方式理解的話,傳統的函式呼叫(Pull Systems)就像是你去餐廳點餐 (Pull),你主動發出請求,才會得到餐點。而 Observable(Push Systems) 則像是你訂閱了某個 YouTube 頻道 (Push),你只需要點擊「訂閱」按鈕一次,之後每當有新影片發布時,平台就會「主動推送」通知給你,你不需隨時去檢查有沒有新影片。
以下角度比較兩種資料消費模式。
單一值 (Single Value) | 多個值 (Multiple Values) | |
---|---|---|
Pull (同步/由消費者驅動) | Function (T ) |
Iterator / Generator (Iterator<T> ) |
Push (非同步/由生產者驅動) | Promise (Promise<T> ) |
Observable (Observable<T> ) |
RxJS 不僅僅是 Observer 和 Iterator 兩種設計模式的結合,它的強大根源在於深受 FP 思想的影響。具體體現在純粹性 (Purity)、組合性 (Composition) 與聲明式風格 (Declarative Style) 這幾點上。
map
、filter
、scan
等。這些運算子本質上就是純函數。它們接收一個 Observable
作為輸入,然後回傳一個全新的 Observable
,而不會修改到原始的 Observable
。這種設計讓我們可以用一種宣告式、可組合的方式來定義複雜的非同步邏輯,就像用 Array.prototype.map
串連陣列操作一樣。這也是為什麼 RxJS 常被比喻為「事件的 Lodash」。 簡單說明 RxJS 如何統整上述三種模式,以處理非同步程式設計上的挑戰。
整合三者後,RxJS 的核心產物 Observable 就此誕生。它是一個懶加載 (lazy)、可組合的、基於推送 (push-based) 的序列。它可以模擬任何資料流,從單一的未來值(類似 Promise)、一個有限的陣列,到一個永不結束的事件序列(如滑鼠移動)。
這就是 RxJS 的核心設計哲學:提供一個統一的抽象模型,用函數式的方法優雅地處理一切非同步事務。
學習 RxJS 最重要其實是心態上的轉變,要將我們對資料的看法從靜態的「陣列 (Array)」擴展到動態的「串流 (Stream)」。
圖 1 陣列與串流示意圖(資料來源: 自行繪製)
假設有個使用者在輸入框中打字,每一個按鍵事件都可以視為這個「輸入串流」中的一個元素。Observable 的工作就是定義好一套邏輯,來「回應 (react)」這個串流中陸續抵達的元素。這也是響應式程式設計 (Reactive Programming) 的核心思想。(想更了解 Reactive的,可再參考漫談模式::Reactive)
RxJS 的世界由三個核心元素構成,分別為:Observable、Observer 與 Subscription。
Observable
是串流的來源,是所有事件的生產者,且 Observable 本身是懶惰 (lazy) 的。它就像一份食譜或一張藍圖,詳細描述了應該如何產生值、在何時產生、以及何時結束或出錯。但僅僅是定義了這份藍圖,並不會有任何事情發生。它代表的是「一個可被呼叫的、關於未來值或事件的集合」。
Observer 是串流的消費者。它是一個 JavaScript 物件,定義了如何去回應 Observable 推送出來的通知。它有三個可選的屬性,分別對應三種不同的通知類型,它們都是回呼函式:
next(value)
: 當 Observable 發出一個新的值時,這個函式會被呼叫,並傳入該值。error(err)
: 當 Observable 內部發生錯誤時,這個函式會被呼叫,並傳入錯誤物件。一旦 error 被呼叫,這個串流就會立即終止。complete()
: 當 Observable 成功發出了它所有的值,且不會再有新的值時,這個函式會被呼叫。串流在此之後也會終止。Subscription 是 Observable 和 Observer 間的橋樑,代表了一次正在進行的執行過程。當我們呼叫 observable.subscribe(observer) 時,才真正「啟動」 Observable 中定義的藍圖,讓資料開始流動。這個呼叫會回傳一個 Subscription 物件。此物件代表了這次的訂閱行為,我們可以用它來手動中斷資料流,也就是取消訂閱,可用 subscription.unsubscribe()
來取消訂閱。
圖 2 Observable、Observer 與 Subscription 關係示意圖(資料來源: 自行繪製)
Observable
可以根據其生產者的行為模式,分為「冷的 (Cold)」和「熱的 (Hot)」兩種 。
值的生產者位於 Observable 內部。每次有新的訂閱者 (Observer) 訂閱時,它都會從頭到尾完整地執行一次生產者函式。每個訂閱者都會得到自己獨立的一份資料流,彼此之間互不影響。
舉例來說,每次訂閱一個 HTTP Observable,都會發起一次新的網路請求。
值的生產者位於 Observable 外部,並且獨立於訂閱行為而存在。多個訂閱者會共享同一個資料來源。當一個訂閱者加入時,它只能接收到從它訂閱那一刻起所發生的事件,無法看到歷史事件。
舉例來說,DOM 事件就屬於 Hot Observable。滑鼠點擊事件的發生與否,與我們是否訂閱它無關。當我們訂閱時,我們就開始收聽這個已經在發生的事件流。
不過 Observable 不一定單純是 Cold Observable 或 Hot Observable,有可能混搭的,也有可能一開始是 Cold,後來變成 Hot。
來看一個簡單的應用範例以理解其使用方式。
import { Observable } from "rxjs";
// 1. 建立 Observable (定義藍圖)
// Observable 的建構子接收一個函式,這個函式被稱為 "producer" 或 "subscriber function"。
// 它會在被訂閱時執行,並接收一個 subscriber 物件作為參數。
const myObservable$ = new Observable((subscriber) => {
console.log("Observable 的執行開始了!");
// 使用 subscriber.next() 來推送值
subscriber.next(1);
subscriber.next(2);
// 如果在這裡發生錯誤,串流會立即停止,並通知 observer 的 error 方法
// subscriber.error(new Error("糟糕,出錯了!"));
subscriber.next(3);
// 使用 subscriber.complete() 來通知串流已正常結束
subscriber.complete();
// 這段程式碼不會被執行,因為 complete() 或 error() 之後,串流就結束了
subscriber.next(4);
// 回傳一個函式,這個函式被稱為 "teardown logic" 或 "unsubscribing logic"。
// 它會在訂閱被取消 (unsubscribe) 或串流結束 (complete/error) 時執行,用於清理資源。
return () => {
console.log("Teardown: 訂閱已取消或結束,資源被釋放!");
};
});
// 2. 建立 Observer (定義消費者行為)
const myObserver = {
next: (value) => console.log("接收到的值:", value),
error: (err) => console.error("捕捉到的錯誤:", err.message),
complete: () => console.log("串流已完成。"),
};
// 3. 建立 Subscription (啟動執行)
console.log("在 subscribe 之前");
const subscription = myObservable$.subscribe(myObserver);
console.log("在 subscribe 之後");
執行這段程式碼後,會在 console 看到以下的輸出順序:
在 subscribe 之前
Observable 的執行開始了!
接收到的值: 1
接收到的值: 2
接收到的值: 3
串流已完成。
Teardown: 訂閱已取消或結束,資源被釋放!
在 subscribe 之後
由此可看出,RxJS 的 subscribe
是同步執行的,當 subscribe 被呼叫時,Observable
內部的 producer 函式(也就是 new Observable (...)
括號內傳的那一串)會被立即同步執行,直到它完成(或遇到非同步操作)。
Operators 是 RxJS 實現函數式組合的核心工具,也是最能體現 RxJS 和 FP 緊密相關的地方。它們是一些純函數,遵循著一個簡單的模式:接收一個來源 Observable,回傳一個新的、經過轉換的 Observable。這些 Operators 讓我們能在資料傳遞給 Observer 之前,先針對資料進行一些處理與轉換。
Operators 可以分為幾個類型,以下簡單介紹,更完整的可參考以下連結:
Creation Operators 是所有串流的起點,它們的職責是從各種不同的資料來源建立新的 Observable。
of
熟悉的 of
方法!就像 Maybe.of
一樣,是一個將值放入容器(在這裡,容器是 Observable
)的方法。
of(...items)
可以接收任意數量的參數,並建立一個會依序、同步地發出這些參數,然後立即完成的 Observable。這對於從靜態資料建立串流非常有用。
import { of } from "rxjs";
const value$ = of(1, "hello", true);
const observer = {
next: (value) => console.log(value),
};
const subscription = value$.subscribe(observer);
// Output: 1, 'hello', true
fromEvent
將 DOM 事件轉換為 Observable,其中 subscribe
/ unsubscribe
就是對應執行 addEventListener
/ removeEventListener
。
import { fromEvent } from "rxjs";
fromEvent(document, "click").subscribe((event) =>
console.log("滑鼠被點擊了!", event.clientX)
);
Pipeable Operators 是用來處理、轉換、過濾和組合串流中資料的主要工具。它們本身不會修改來源 Observable,而是回傳一個全新的 Observable 。要使用它們,我們需要透過 Observable 的 .pipe()
方法。
pipe
pipe 方法在函數組合篇章有提過,現在看到覺得十分熟悉~pipe()
就像 Observable
的資料處理管線,Observable
的原始資料從一端進入,流經在 pipe()
中定義的每一個 Operator,最後從另一端輸出處理完畢的結果 。
pipe()
運作原理也是 FP 世界熟悉的,它會接收一系列 Operator 函式,並透過類似 reduce
的方式將它們串連起來。第一個 Operator 會接收來源 Observable,並回傳一個新的 Observable;接著,第二個 Operator 會接收前一個 Operator 回傳的 Observable,再回傳一個更新的 Observable,以此類推,直到最後一個運算子回傳最終的 Observable。
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5)
.pipe(
// 站點1: 過濾出奇數
filter(n => n % 2!== 0),
// 站點2: 將每個數字平方
map(n => n * n)
)
.subscribe(x => console.log(x));
// Output: 1, 9, 25
以下介紹些常見的 Pipeable Operators。
map
圖 3 Observable map
示意圖(資料來源:https://rxjs.dev/api/operators/map )
如同陣列的 map
,map(project)
會將串流中的每一個值,透過參數 project
函式進行轉換,並發出轉換後的新值。
以官網這範例來說,Observable 一開始收到的原始資料是點擊事件的整個 event 物件,接著透過 map 取出 event 物件中的 clientX 值,產生新的 Observable 傳出來。
import { fromEvent, map } from 'rxjs';
const clicks = fromEvent(document, 'click');
const positions = clicks.pipe(map(ev => ev.clientX));
positions.subscribe(x => console.log(x));
filter
如同陣列的 filter
,filter(predicate)
會對串流中的每一個值進行判斷,只有當參數 predicate
函式回傳 true 時,該值才會被允許通過。
tap
/ do
tap
和之前函數組合文章中提到的 trace
函數用途十分相似,可以將 tap 視為 RxJS 為非同步資料流所提供的 trace 版本。
在 tap 中可以使用帶有 side effect 的操作,它讓我們能「窺視」資料流,以便執行副作用 (side-effects),同時不對流經的資料本身進行任何修改。
從以下這範例可看出,tap
沒有修改流經的資料。
// https://www.learnrxjs.io/learn-rxjs/operators/utility/do#examples
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5);
// transparently log values from source with 'tap'
const example = source.pipe(
tap(val => console.log(`BEFORE MAP: ${val}`)),
map(val => val + 10),
tap(val => console.log(`AFTER MAP: ${val}`))
);
//'tap' does not transform values
//output: 11...12...13...14...15
const subscribe = example.subscribe(val => console.log(val));
今天簡單的介紹 RxJS,我們理解了 RxJS 的核心心智模型:
of
/ fromEvent
/ map
/ filter
/ tap
)以宣告式的方式處理資料這些觀念讓我們能用處理陣列的直覺來組裝隨時間而來的事件,下一篇文章會繼續介紹 Transformation Operator,以及看看如何自己打造簡易 Observable,並看看 Observable 和 Functor、Monad 與 Monoid 的關聯,最後會發現 RxJS 真的是非常符合 FP 理念的工具~