iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0

https://ithelp.ithome.com.tw/upload/images/20251011/20168201EybZWi1l7W.png

前言

今天要介紹的是我期待已久的 RxJS! RxJS 博大精深,甚至可以用一整個鐵人賽系列文來介紹(在此推推 30 天精通 RxJS打通 RxJS 任督二脈這兩個系列文)。

希望今天的文章可以簡單帶大家理解 RxJS 的設計哲學,瞭解它如何體現 FP 的精神,並應用在實務開發上。而如果要進一步深入理解語法和更多應用,很推薦大家去讀上面的鐵人賽文章。

為何我們需要 RxJS?

在這系列 Functional Programming 的文章中,我們學會了如何運用 mapfilterreduce 這些工具來處理陣列,也就是那些存在於「空間」維度上的資料集合。我們透過純函式與函式組合,降低了程式碼的複雜度。

但如果我們的資料是散落在「時間」的維度上呢?想像一下使用者的滑鼠點擊、鍵盤輸入、來自伺服器的非同步回應、甚至是 WebSocket 的即時訊息... 這些事件如同一條時間長河,我們要如何用 FP 的思維來捕捉、組合與操作它們?

這正是 RxJS (Reactive Extensions for JavaScript) 要解決的問題。它不僅僅是一個函式庫,更是一種處理非同步事件的思考模式。它讓我們能將非同步事件視為一種「集合」,一種隨時間流動的「串流 (Stream)」,並讓我們能夠用熟悉的 FP 工具來駕馭它們。

在認識 RxJS 之前,先來看看 ReactiveX,因為 RxJS 是 ReactiveX 的 JavaScript 版本,ReactiveX 才是一切的起源。

ReactiveX 官方是這樣定義自己的:

ReactiveX is a combination of the best ideas from
the Observer pattern, the Iterator pattern, and functional programming.

Observer pattern、Iterator 和 functional programming 我剛好都介紹過,Observer pattern 可參考之前鐵人賽的文章[Day 11] Observer 模式,Iterator 則在昨天介紹 Generator Function 的時候稍微介紹了,functional programming 不必多說,這一整個系列文都在說明 FP 的設計精神。

以下簡單複習一下這三個概念,並說明他們的核心觀念如何體現在 RxJS 的設計上。

RxJS 的設計精神:Observer、Iterator 與 FP

Observer Pattern

在之前的 [Day 11] Observer 模式文章中介紹了 Observer 模式,快速複習一下它是什麼:Observer 模式定義了一種一對多的依賴關係,當一個物件(被稱為 Subject 或 Publisher)的狀態發生改變時,所有依賴於它的物件(被稱為 Observers 或 Subscribers)都會得到通知並自動更新。

  • 核心概念:Observer Pattern 的關鍵在於解耦了「發布者」與「訂閱者」,讓發布者不需要知道訂閱者的具體細節,只需在適當時機發出通知即可。這在資料消費上,是一種推送 (Push) 的模型:資料或狀態的變更由 Subject 主動推送到 Observer。
  • 在 RxJS 中的體現:這個模式是 RxJS 資料傳遞的基礎。在 RxJS 中,Observable (可觀察物件) 扮演了 Subject 的角色,而 Observer (觀察者) 則接收由 Observable 推送出來的值。

Iterator Pattern

在 Lazy evaluation 和 Generator function 文章中,我們認識了 Iterator:一個 Iterator 物件提供 next() 方法,讓消費者可以逐步「拉取」序列中的值。例如,一個陣列經過 iterator.next() 就能依序取出元素;而使用 Generator 函式,則能更優雅地建構這種逐步拉取的序列。

  • 核心概念:Iterator 的關鍵在於它是一種資料消費的拉取 (Pull) 模型。消費者(也就是呼叫 iterator.next() 的程式碼)主動向生產者(資料源)請求下一個值。生產者是被動的,只有在被請求時才會提供資料。
  • 在 RxJS 中的體現:RxJS 借鑒了迭代器「循序傳遞一系列值」的概念,但進行了控制反轉 (Inversion of Control)。如果說 Iterator 是消費者主動去拉資料,那 RxJS 的 Observable 則是生產者主動把資料給消費者。這個從 Pull 到 Push 的轉變,是理解 RxJS 與傳統資料處理方式差異的核心。

更具體來說,在「同步世界」裡,Iterator 是拉取資料的標準介面。而在「非同步世界」裡,Observable 則是推送資料的對應機制。

可以用幾個例子來對照理解:

  • Array 是一個「已存在於空間」的集合,可透過 Iterator 同步逐一取值。(Pull-based)
  • Promise 代表「未來的一個值」,當值準備好時會一次性推送給你。(Push-based)
  • Observable 則代表「未來的一連串值」,會隨著時間軸陸續推送出來。(Push-based)

因此我們可以把 Observable 看作是「時間軸上的集合」或「非同步的迭代器」。Iterator 和 Observable 也都具備 FP 程式設計熱愛的惰性 (Lazy) 特質:它們的邏輯只有在被實際消費時才會執行——對 Iterator 而言,是呼叫 .next();對 Observable 而言,是呼叫 .subscribe()

補充:資料的消費方式(Pull 與 Push)

在「Lazy evaluation 和 Generator function」的文章中,我們介紹了迭代器 (Iterator) 和生成器 (Generator),它們的核心是一種「惰性求值」的資料消費方式。這種資料消費的方式屬於 Pull System (拉取系統),而 RxJS 利用 Observer 模式來設計資料消費方式,這屬於 Push System (推送系統)。以下稍微簡介兩者。
(補充一下,所謂資料消費者就是需要資料的人,而資料生產者就是產出資料的人,應該還算好理解~)

  • Pull Systems:在拉取系統中,資料消費者 (Consumer) 掌握著主動權。它在自己準備好、需要資料的時候,主動向生產者 (Producer) 請求資料。生產者本身是被動的,它不知道資料何時會被取走。最典型的例子就是函式呼叫 const value = someFunction(),或是迭代器的 iterator.next()。消費者主動「拉」回一個值。另外,現在很多前端框架都採用的 Signals 機制,其資料消費方式也屬於拉取模式,需要時才存取。
  • Push Systems:推送系統的情況正好和 Pull Systems 相反,生產者 (Producer) 掌握著主動權。一旦有新的資料,生產者會主動將其「推」給消費者。消費者是被動的,它不知道何時會收到資料,只能準備好接收。Promises、DOM 的事件監聽 (Event Listeners) 和 RxJS 都屬於推送系統。

用比較生活化的方式理解的話,傳統的函式呼叫(Pull Systems)就像是你去餐廳點餐 (Pull),你主動發出請求,才會得到餐點。而 Observable(Push Systems) 則像是你訂閱了某個 YouTube 頻道 (Push),你只需要點擊「訂閱」按鈕一次,之後每當有新影片發布時,平台就會「主動推送」通知給你,你不需隨時去檢查有沒有新影片。

以下角度比較兩種資料消費模式。

單一值 (Single Value) 多個值 (Multiple Values)
Pull (同步/由消費者驅動) Function (T) Iterator / Generator (Iterator<T>)
Push (非同步/由生產者驅動) Promise (Promise<T>) Observable (Observable<T>)

Functional Programming 的體現

RxJS 不僅僅是 Observer 和 Iterator 兩種設計模式的結合,它的強大根源在於深受 FP 思想的影響。具體體現在純粹性 (Purity)、組合性 (Composition) 與聲明式風格 (Declarative Style) 這幾點上。

  • 核心概念:FP 強調使用沒有副作用的純函數來轉換資料,並將這些小函數組合成更複雜的邏輯。
  • 在 RxJS 中的體現:RxJS 提供了許多被稱為運算子 (Operators) 的工具,例如 mapfilterscan 等。這些運算子本質上就是純函數。它們接收一個 Observable 作為輸入,然後回傳一個全新的 Observable,而不會修改到原始的 Observable 。這種設計讓我們可以用一種宣告式、可組合的方式來定義複雜的非同步邏輯,就像用 Array.prototype.map 串連陣列操作一樣。這也是為什麼 RxJS 常被比喻為「事件的 Lodash」。  

綜合三者的 RxJS

簡單說明 RxJS 如何統整上述三種模式,以處理非同步程式設計上的挑戰。

  1. Iterator Pattern 提供了「一個值的序列」這個概念,但它的 Pull-based 模型不適用於非同步事件。我們無法主動去「拉」下一次的滑鼠點擊,因為事件的發生時間是由使用者決定的。
  2. Observer Pattern 透過其 Push-based 模型完美解決了時序問題,這對於處理非同步事件十分重要。然而,傳統的觀察者模式缺乏處理「完成」或「錯誤」的標準化流程,更重要的是,它沒有提供一套優雅的方式來組合多個事件來源。若想實現「只在使用者停止輸入 500 毫秒後才發送請求」這樣的功能,使用傳統的事件監聽器會需要手動管理計時器、flog 變數等多個外部狀態,程式碼很快就會變得複雜且容易出錯。
  3. Functional Programming 恰好彌補了 Observer Pattern 在組合性上的不足。透過將「被推送的值的序列」視為一個「隨時間展開的集合」,我們就能應用那些我們早已熟悉的、可組合的純函數(也就是運算子)來對其進行轉換、過濾和組合。

整合三者後,RxJS 的核心產物 Observable 就此誕生。它是一個懶加載 (lazy)、可組合的、基於推送 (push-based) 的序列。它可以模擬任何資料流,從單一的未來值(類似 Promise)、一個有限的陣列,到一個永不結束的事件序列(如滑鼠移動)。

這就是 RxJS 的核心設計哲學:提供一個統一的抽象模型,用函數式的方法優雅地處理一切非同步事務。

Hello, Observable

從靜態陣列到動態串流(Stream)

學習 RxJS 最重要其實是心態上的轉變,要將我們對資料的看法從靜態的「陣列 (Array)」擴展到動態的「串流 (Stream)」。

  • 陣列 (Array):可看作是空間上的資料集合。當你定義一個陣列時,它所有的元素都已經存在於記憶體中,你可以隨時存取任何一個元素。
  • 串流 (Stream):可看作是時間上的資料集合。串流中的元素(或事件)是隨著時間的推移,一個一個抵達的。你無法預知下一個元素何時會來,也無法一次性看到所有元素。

https://ithelp.ithome.com.tw/upload/images/20251011/20168201gPssyM4RYD.png
圖 1 陣列與串流示意圖(資料來源: 自行繪製)

假設有個使用者在輸入框中打字,每一個按鍵事件都可以視為這個「輸入串流」中的一個元素。Observable 的工作就是定義好一套邏輯,來「回應 (react)」這個串流中陸續抵達的元素。這也是響應式程式設計 (Reactive Programming) 的核心思想。(想更了解 Reactive的,可再參考漫談模式::Reactive)

核心元素:Observable, Observer, Subscription

RxJS 的世界由三個核心元素構成,分別為:Observable、Observer 與 Subscription。

Observable (可觀察物件)

Observable 是串流的來源,是所有事件的生產者,且 Observable 本身是懶惰 (lazy) 的。它就像一份食譜或一張藍圖,詳細描述了應該如何產生值、在何時產生、以及何時結束或出錯。但僅僅是定義了這份藍圖,並不會有任何事情發生。它代表的是「一個可被呼叫的、關於未來值或事件的集合」。

Observer (觀察者)

Observer 是串流的消費者。它是一個 JavaScript 物件,定義了如何去回應 Observable 推送出來的通知。它有三個可選的屬性,分別對應三種不同的通知類型,它們都是回呼函式:

  • next(value): 當 Observable 發出一個新的值時,這個函式會被呼叫,並傳入該值。
  • error(err): 當 Observable 內部發生錯誤時,這個函式會被呼叫,並傳入錯誤物件。一旦 error 被呼叫,這個串流就會立即終止。
  • complete(): 當 Observable 成功發出了它所有的值,且不會再有新的值時,這個函式會被呼叫。串流在此之後也會終止。

Subscription (訂閱)

Subscription 是 Observable 和 Observer 間的橋樑,代表了一次正在進行的執行過程。當我們呼叫 observable.subscribe(observer) 時,才真正「啟動」 Observable 中定義的藍圖,讓資料開始流動。這個呼叫會回傳一個 Subscription 物件。此物件代表了這次的訂閱行為,我們可以用它來手動中斷資料流,也就是取消訂閱,可用 subscription.unsubscribe() 來取消訂閱。 

https://ithelp.ithome.com.tw/upload/images/20251011/20168201xL7C63vKgY.png
圖 2 Observable、Observer 與 Subscription 關係示意圖(資料來源: 自行繪製)

Cold 與 Hot Observable

Observable 可以根據其生產者的行為模式,分為「冷的 (Cold)」和「熱的 (Hot)」兩種 。

Cold Observable

值的生產者位於 Observable 內部。每次有新的訂閱者 (Observer) 訂閱時,它都會從頭到尾完整地執行一次生產者函式。每個訂閱者都會得到自己獨立的一份資料流,彼此之間互不影響。
舉例來說,每次訂閱一個 HTTP Observable,都會發起一次新的網路請求。

Hot Observable

值的生產者位於 Observable 外部,並且獨立於訂閱行為而存在。多個訂閱者會共享同一個資料來源。當一個訂閱者加入時,它只能接收到從它訂閱那一刻起所發生的事件,無法看到歷史事件。
舉例來說,DOM 事件就屬於 Hot Observable。滑鼠點擊事件的發生與否,與我們是否訂閱它無關。當我們訂閱時,我們就開始收聽這個已經在發生的事件流。

不過 Observable 不一定單純是 Cold Observable 或 Hot Observable,有可能混搭的,也有可能一開始是 Cold,後來變成 Hot。

實務範例

來看一個簡單的應用範例以理解其使用方式。

import { Observable } from "rxjs";

// 1. 建立 Observable (定義藍圖)
// Observable 的建構子接收一個函式,這個函式被稱為 "producer" 或 "subscriber function"。
// 它會在被訂閱時執行,並接收一個 subscriber 物件作為參數。
const myObservable$ = new Observable((subscriber) => {
  console.log("Observable 的執行開始了!");

  // 使用 subscriber.next() 來推送值
  subscriber.next(1);
  subscriber.next(2);

  // 如果在這裡發生錯誤,串流會立即停止,並通知 observer 的 error 方法
  // subscriber.error(new Error("糟糕,出錯了!"));

  subscriber.next(3);

  // 使用 subscriber.complete() 來通知串流已正常結束
  subscriber.complete();

  // 這段程式碼不會被執行,因為 complete() 或 error() 之後,串流就結束了
  subscriber.next(4);

  // 回傳一個函式,這個函式被稱為 "teardown logic" 或 "unsubscribing logic"。
  // 它會在訂閱被取消 (unsubscribe) 或串流結束 (complete/error) 時執行,用於清理資源。
  return () => {
    console.log("Teardown: 訂閱已取消或結束,資源被釋放!");
  };
});

// 2. 建立 Observer (定義消費者行為)
const myObserver = {
  next: (value) => console.log("接收到的值:", value),
  error: (err) => console.error("捕捉到的錯誤:", err.message),
  complete: () => console.log("串流已完成。"),
};

// 3. 建立 Subscription (啟動執行)
console.log("在 subscribe 之前");
const subscription = myObservable$.subscribe(myObserver);
console.log("在 subscribe 之後");

執行這段程式碼後,會在 console 看到以下的輸出順序:

在 subscribe 之前
Observable 的執行開始了!
接收到的值: 1
接收到的值: 2
接收到的值: 3
串流已完成。
Teardown: 訂閱已取消或結束,資源被釋放!
在 subscribe 之後

由此可看出,RxJS 的 subscribe 是同步執行的,當 subscribe 被呼叫時,Observable 內部的 producer 函式(也就是 new Observable (...) 括號內傳的那一串)會被立即同步執行,直到它完成(或遇到非同步操作)。

Operators

Operators 是 RxJS 實現函數式組合的核心工具,也是最能體現 RxJS 和 FP 緊密相關的地方。它們是一些純函數,遵循著一個簡單的模式:接收一個來源 Observable,回傳一個新的、經過轉換的 Observable。這些 Operators 讓我們能在資料傳遞給 Observer 之前,先針對資料進行一些處理與轉換。

Operators 可以分為幾個類型,以下簡單介紹,更完整的可參考以下連結:

Creation Operators

Creation Operators 是所有串流的起點,它們的職責是從各種不同的資料來源建立新的 Observable。

of

熟悉的 of 方法!就像 Maybe.of 一樣,是一個將值放入容器(在這裡,容器是 Observable)的方法。

of(...items) 可以接收任意數量的參數,並建立一個會依序、同步地發出這些參數,然後立即完成的 Observable。這對於從靜態資料建立串流非常有用。

import { of } from "rxjs";

const value$ = of(1, "hello", true);

const observer = {
  next: (value) => console.log(value),
};

const subscription = value$.subscribe(observer);
// Output: 1, 'hello', true

fromEvent

將 DOM 事件轉換為 Observable,其中 subscribe / unsubscribe 就是對應執行 addEventListener / removeEventListener

import { fromEvent } from "rxjs";

fromEvent(document, "click").subscribe((event) =>
  console.log("滑鼠被點擊了!", event.clientX)
);

Pipeable Operators

Pipeable Operators 是用來處理、轉換、過濾和組合串流中資料的主要工具。它們本身不會修改來源 Observable,而是回傳一個全新的 Observable 。要使用它們,我們需要透過 Observable 的 .pipe() 方法。  

pipe

pipe 方法在函數組合篇章有提過,現在看到覺得十分熟悉~
pipe() 就像 Observable 的資料處理管線,Observable 的原始資料從一端進入,流經在 pipe() 中定義的每一個 Operator,最後從另一端輸出處理完畢的結果 。

pipe() 運作原理也是 FP 世界熟悉的,它會接收一系列 Operator 函式,並透過類似 reduce 的方式將它們串連起來。第一個 Operator 會接收來源 Observable,並回傳一個新的 Observable;接著,第二個 Operator 會接收前一個 Operator 回傳的 Observable,再回傳一個更新的 Observable,以此類推,直到最後一個運算子回傳最終的 Observable。

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5)
 .pipe(
    // 站點1: 過濾出奇數
    filter(n => n % 2!== 0),
    // 站點2: 將每個數字平方
    map(n => n * n)
  )
 .subscribe(x => console.log(x));

// Output: 1, 9, 25

以下介紹些常見的 Pipeable Operators。

map

image
圖 3 Observable map 示意圖(資料來源:https://rxjs.dev/api/operators/map )
如同陣列的 mapmap(project) 會將串流中的每一個值,透過參數 project 函式進行轉換,並發出轉換後的新值。

以官網這範例來說,Observable 一開始收到的原始資料是點擊事件的整個 event 物件,接著透過 map 取出 event 物件中的 clientX 值,產生新的 Observable 傳出來。

import { fromEvent, map } from 'rxjs';

const clicks = fromEvent(document, 'click');
const positions = clicks.pipe(map(ev => ev.clientX));

positions.subscribe(x => console.log(x));

filter

如同陣列的 filterfilter(predicate) 會對串流中的每一個值進行判斷,只有當參數 predicate 函式回傳 true 時,該值才會被允許通過。  

tap / do

tap 和之前函數組合文章中提到的 trace 函數用途十分相似,可以將 tap 視為 RxJS 為非同步資料流所提供的 trace 版本。

在 tap 中可以使用帶有 side effect 的操作,它讓我們能「窺視」資料流,以便執行副作用 (side-effects),同時不對流經的資料本身進行任何修改。

從以下這範例可看出,tap 沒有修改流經的資料。

// https://www.learnrxjs.io/learn-rxjs/operators/utility/do#examples
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);
// transparently log values from source with 'tap'
const example = source.pipe(
  tap(val => console.log(`BEFORE MAP: ${val}`)),
  map(val => val + 10),
  tap(val => console.log(`AFTER MAP: ${val}`))
);

//'tap' does not transform values
//output: 11...12...13...14...15
const subscribe = example.subscribe(val => console.log(val));

小結

今天簡單的介紹 RxJS,我們理解了 RxJS 的核心心智模型:

  • 把非同步事件視為「時間上的集合(Stream)」
  • 透過 Observable / Observer / Subscription 這三要素啟動與管理資料流
  • 用 Creation + Pipeable operators(如 of / fromEvent / map / filter / tap)以宣告式的方式處理資料

這些觀念讓我們能用處理陣列的直覺來組裝隨時間而來的事件,下一篇文章會繼續介紹 Transformation Operator,以及看看如何自己打造簡易 Observable,並看看 Observable 和 Functor、Monad 與 Monoid 的關聯,最後會發現 RxJS 真的是非常符合 FP 理念的工具~

Reference


上一篇
[Day 26] Lazy Evaluation 和 Generator Function
系列文
30 天的 Functional Programming 之旅27
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言