iT邦幫忙

2025 iThome 鐵人賽

DAY 28
0

https://ithelp.ithome.com.tw/upload/images/20251012/20168201qs4iaJbNlp.png

前言

今天會接續昨天的 RxJS 介紹,繼續介紹 flatten operator,以及如何自己打造簡易 Observable,最後以 FP 視角再次檢視 Observable~

Operators

Transformation Operators & Flattening Operators

在實務中可能會遇到一種情況:一個 Observable 發出的值本身又是另一個 Observable。這種「ObservableObservable」被稱為高階 Observable (Higher-order Observable)。例如,當一個點擊事件串流 (Observable<ClickEvent>) 中的每次點擊都觸發一次 API 請求(回傳 Observable<Response>)時,我們就會得到一個高階 Observable。

直接訂閱高階 Observable 會得到 Observable 物件本身,而不是我們想要的最終資料。這時就需要使用 Flattening Operators 來「解開」這層嵌套,將內部 Observable 的值攤平到外部的串流中。

而攤平的方式可分幾種,分別為 concatMapmergeMapswitchMap,為了說明它們的區別,我們假設一個情境:使用者在搜尋框中輸入文字,每次輸入都觸發一次 API 查詢。

mergeMap

https://ithelp.ithome.com.tw/upload/images/20251012/20168201vjXbcxciIJ.jpg
圖 1 mergeMap 示意圖(資料來源: https://rxmarbles.com/#mergeMap)

  • 行為:當外部 Observable (使用者輸入) 發出一個值時,mergeMap 會立即為其建立並訂閱一個內部 Observable (API 請求)。它會同時處理所有的內部 Observable,並在任何一個內部 Observable 發出值時,立即將該值合併到最終的輸出流中。它不保證輸出的順序。
  • 情境:需要併發執行多個任務,且不關心它們完成的順序時。例如,一次性上傳多個檔案,哪個先傳完就先處理哪個。在搜尋框情境下,如果使用者快速輸入 "abc",它會同時發出三個 API 請求,並將結果隨機合併。(不會確保 request 對應到的 response 順序)

concatMap

https://ithelp.ithome.com.tw/upload/images/20251012/20168201ZOy8cqjVqi.jpg
圖 2 concatMap 示意圖(資料來源: https://rxmarbles.com/#concatMap)

  • 行為:當外部 Observable 發出一個值時,concatMap 也會為其建立一個內部 Observable,但它會等待前一個內部 Observable 完成後,才會訂閱並執行下一個。它保證了處理和輸出的順序與觸發順序一致。
  • 情境:當任務之間有依賴關係,必須一個接一個執行時。例如,先執行 POST 請求新增資料,成功後再執行 GET 請求來獲取更新後的列表。在搜尋框情境下,如果使用者快速輸入 "abc",它會先完成對 "a" 的請求,再發起對 "b" 的請求,以此類推。

switchMap

https://ithelp.ithome.com.tw/upload/images/20251012/20168201myOQOOxoXP.jpg
圖 3 switchMap 示意圖(資料來源: https://rxmarbles.com/#switchMap)

  • 行為:當外部 Observable 發出一個新值時,switchMap 會立即訂閱對應的內部 Observable,同時取消並拋棄前一個尚未完成的內部 Observable。它永遠只關心最新的那個任務。
  • 情境:如果使用者快速輸入 "abc",當 "b" 的輸入事件到來時,switchMap 會取消對 "a" 的 API 請求;當 "c" 的輸入事件到來時,它會取消對 "b" 的請求。最終只有對 "c" 的請求會被完成並回傳結果。可避免了不必要的網路請求和處理過期的資料。

程式碼範例如下,有興趣的可以到此連結試試看。

import { fromEvent, of } from "rxjs";
import { map, delay, mergeMap, concatMap, switchMap } from "rxjs/operators";

// 模擬 API 請求(回傳一個 Observable,延遲一段時間後才發出結果)
function fakeApi(query) {
  const delayTime = Math.floor(Math.random() * 2000) + 500; // 隨機 0.5 ~ 2.5 秒
  return of(`結果: ${query}`).pipe(delay(delayTime));
}

// 取得輸入框
const input = document.getElementById("search");

// 建立輸入事件流
const input$ = fromEvent(input, "input").pipe(map((e) => e.target.value));

// 1. mergeMap: 併發多個請求,誰先回來誰先輸出
input$
  .pipe(mergeMap((query) => fakeApi(query)))
  .subscribe((result) => console.log("mergeMap:", result));

// 2. concatMap: 等待前一個完成,再執行下一個,順序與輸入順序一致
input$
  .pipe(concatMap((query) => fakeApi(query)))
  .subscribe((result) => console.log("concatMap:", result));

// 3. switchMap: 永遠只保留最後一次輸入的請求,舊的會被取消
input$
  .pipe(switchMap((query) => fakeApi(query)))
  .subscribe((result) => console.log("switchMap:", result));

打造自己的 Observable

了解 Observable 簡單的使用方式後,來試著自己實做看看簡單版的 Observable 吧!
我們的目標是建立一個 MyObservable 類別,並為它加上 ofpipe 方法,它將模仿 rxjs/Observable 的核心功能。

MyObservable

MyObservable 的設計規劃如下:

  • constructor: 建構子會接收一個函式作為唯一的參數,可稱之為生產者函式 (_subscribe)。這個函式就是我們之前提到的藍圖,它知道如何產生值,並會在被訂閱時實際執行。
  • subscribe method: 這是 MyObservable 實例對外暴露的公開方法。它接收一個 observer 物件。當 subscribe 被呼叫時,它會執行儲存在建構子中的 _subscribe 函式,並將傳入的 observer 物件交給它。這是觸發資料流動的開關。

接著來實作看看這 class:

class MyObservable {
  // 建構子接收一個函式,我們將其儲存在實例的私有屬性 _subscribe 中。
  // 這個函式是產生資料的核心邏輯。
  constructor(subscribeFunc) {
    this._subscribe = subscribeFunc;
  }

  // subscribe 是啟動資料流的方法。
  // 它接收一個 observer 物件,該物件包含 next, error, complete 方法。
  subscribe(observer) {
    // 關鍵就在這裡:直接呼叫我們在建構時儲存的 _subscribe 函式,
    // 並將 observer 傳遞進去。
    // 這就將「定義」和「執行」兩個階段連接起來了。
    this._subscribe(observer);
  }
}

實作 offromEventpipe 方法

接著為 MyObservable 增加 ofpipe 方法。

  • of:接收任意數量的參數,並建立一個會依序發出這些參數,最後立即完成的 Observable。
  • pipe:一個實例方法,它接收任意數量的 operator 函式。它會透過 reduce 將這些運算子函式依序應用到當前的 Observable 上,回傳一個經過所有運算子轉換後的全新 Observable。  
class MyObservable {
  constructor(subscribeFunc) {
    this._subscribe = subscribeFunc;
  }

  subscribe(observer) {
    this._subscribe(observer);
  }

  // 加上一個靜態的 of 方法
  static of(...args) {
    // of 方法回傳一個新的 MyObservable 實例。
    return new MyObservable(observer => {
      // 在這個新的 Observable 的生產者函式中,我們遍歷傳入 of 的所有參數。
      for (const arg of args) {
        observer.next(arg);
      }
      // 所有參數都推送完畢後,呼叫 observer.complete() 來結束串流。
      observer.complete();
    });
  }

  // 加上 pipe 方法
  pipe(...operators) {
    // 使用 reduce 將所有運算子串連起來。
    // 初始值是 this (當前的 Observable 實例)。
    // 每一個 operator 都是一個函式,它接收一個 Observable,回傳一個新的 Observable。
    return operators.reduce((source, operator) => operator(source), this);
  }
}

實作 Operator 方法:map

pipe 方法本身只是個串連工具,要讓它真正有用,我們需要建立可以放進去的 Operator。Operator 是一個高階函式:它接收一些配置參數(例如 map 接收一個轉換函式),然後回傳一個新函式。這個回傳的新函式才是真正作用於資料流的函式,它接收一個來源 Observable 作為輸入,並回傳一個新的 Observable 。

讓我們來實作自己的 map Operator。

// myMap 是一個高階函式
const myMap = (project) => {
  // 它回傳一個新函式,這個函式才是真正的 operator
  return (sourceObservable) => {
    // operator 會回傳一個全新的 Observable
    return new MyObservable(observer => {
      // 在新的 Observable 內部,我們訂閱來源 Observable
      sourceObservable.subscribe({
        next: (value) => {
          // 當來源發出值時,使用 project 函式進行轉換
          const newValue = project(value);
          // 然後將轉換後的值推送到下游
          observer.next(newValue);
        },
        error: (err) => observer.error(err), // 直接傳遞錯誤
        complete: () => observer.complete(), // 直接傳遞完成訊號
      });
    });
  };
};

將一切整合

現在我們來看看它們如何協同運作:

// 1. 使用 MyObservable.of 建立一個來源串流
const source$ = MyObservable.of(1, 2, 3);

// 2. 定義一個 observer
const myObserver = {
  next: value => console.log('Received:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Completed!'),
};

// 3. 使用 pipe 和自訂的 myMap 運算子
const mapped$ = source$.pipe(
  myMap(x => x * 10),
  myMap(x => `The result is ${x}`)
);

// 4. 訂閱最終的 Observable
mapped$.subscribe(myObserver);

執行這段程式碼,會在 console 看到:

Received: The result is 10
Received: The result is 20
Received: The result is 30
Completed!

以上實作可以讓我們觀察到幾點:

  1. Observable 是一個裝載了「生產者函式」的容器,本身是懶惰的。
  2. subscribe 是觸發執行的開關。
  3. operator(如 myMap)是一個高階函式,它接收一個 Observable 並返回一個新的 Observable。
  4. pipe 透過 reduce 將多個 operators 串連起來,形成一個新的、組合後的 Observable,而不會修改任何中間過程的 Observable。

Observable 與 Functor、Monad、Monoid

在看 Observable 的設計哲學和 operator 的運作機制時,會覺得有些熟悉,因為 RxJS 在設計上與我們見過的結構十分相似:Functor、Monad 與 Monoid。
接下來就來看看 Observable 與他們的關係~

Observable as a Functor: map 的力量

Functor 的意思是一個「可被映射 (mappable) 的容器」。它必須提供一個 map 方法,這個方法接收一個函式,並將該函式應用於容器內的值,最終回傳一個裝有新值的、結構相同的全新容器。

Observable 符合這個定義,因此它屬於一種 Functor。

  • 容器 (Container): Observable 本身就是一個容器,它裝載是一系列隨時間流動的值。
  • map 方法: 對應到 RxJS 的 map operator。
  • 結構保持: map operator 接收一個 Observable<A> 和一個轉換函式 f: A -> B,然後回傳一個全新的 Observable<B>。它只轉換了內部的值,而「非同步事件串流」這個容器結構本身(包含其時序、錯誤處理、完成通知等特性)被完整地保留了下來。

Observable 作為一個 Functor,搭建了一座橋樑。它告訴我們:「你只需要提供單純的資料轉換邏輯,我會負責在複雜的非同步時間軸上,正確地、安全地應用它。」

Observable as a Monad: 馴服巢狀的非同步

Monad 可以看作是 Functor 的延伸版本,它除了 map 之外,還提供了一個關鍵操作(常稱為 flatMapchain),專門用來處理「容器中的容器」的情況,並將其攤平 (flatten)為單層的容器。

在 RxJS 中,mergeMapconcatMapswitchMap 這些 Flattening Operators 正是 Monad 中 chain 概念的具體實現。它們接收一個函式,該函式回傳一個 Observable,然後它們會自動訂閱這個內部的 Observable,並將其發出的值攤平到外部的串流中。

更深一層來看,Observable 的行為模式與我們提過的 IO Monad 十分相似。IO Monad 的核心思想是:將一個帶有副作用的操作(如讀取檔案、網路請求)封裝成一個純粹的值。這個值本身只是一個對該操作的「描述」或「藍圖」,它本身並不執行任何副作用。副作用只有在這個 IO 值被一個特定的「執行」函式呼叫時才會真正發生。

這與 Observable 的運作方式很相似:

  • new Observable(subscriber => {... }) 這段程式碼本身是純粹的。它建立了一個 Observable 物件,這個物件就是一個「藍圖」,描述了當有人訂閱時應該執行的一系列操作(例如 addEventListener 或 fetch)。
  • 在呼叫 .subscribe() 之前,沒有任何事件被註冊監聽事件,沒有任何網路請求被發出。
  • .subscribe() 的呼叫,就相當於 IO Monad 的「執行」函式。它觸發了藍圖中描述的副作用。

因此我們可以將 Observable 視為一個偽裝成 IO Monad 的物件。它讓我們能以一種純函數式的方式來定義、組合和傳遞那些不純的、非同步的副作用操作。

Observable and Monoid: 組合的藝術

在之前的 再探 Monoid 文章中,有介紹 Monoid 是一個包含三要素的代數結構:一個集合、一個滿足結合律的二元操作,以及一個單位元素。另外當時在範例程式中也提到我們可以合併 Stream 這型別,而 Stream 就是類似 RxJS 的概念。

Observable 本身的確構成了一個 Monoid:

  • 集合: 對於任意給定的類型 T,所有 Observable<T> 的集合。
  • 二元操作: 對應 RxJS 的 merge operator。merge(obsA, obsB) 接收兩個 Observable,並回傳一個新的 Observable,這個新的 Observable 會併發地發出 obsAobsB 的所有值。這個操作滿足結合律:merge(a, merge(b, c)) 的行為等同於 merge(merge(a, b), c)
  • 單位元素: RxJS 有個 EMPTY creation operator。EMPTY 是一個特殊的 Observable,它不會發出任何 next 值,而是立即發出 complete 通知。merge(obsA, EMPTY) 的行為與單獨的 obsA 完全相同,滿足單位元素定義。

Observable 構成了 Monoid,代表 Observable 的集合都是 reducible 的。Monoid 的結構確保我們能用可靠的方式,將一組 Observable 合併成單一的一個。

以下簡單示意 Monoid 的 Observable,如何用 reduce 的方式歸納所有的事件流變成單一事件流:

import { fromEvent, merge, EMPTY } from 'rxjs';

const buttonA = document.getElementById('btnA');
const buttonB = document.getElementById('btnB');
const buttonC = document.getElementById('btnC');

const clickA$ = fromEvent(buttonA, 'click');
const clickB$ = fromEvent(buttonB, 'click');
const clickC$ = fromEvent(buttonC, 'click');

// 把所有來源放到陣列裡
const observables = [clickA$, clickB$, clickC$];

// 因為 Observable 是一個 Monoid,我們可以用 reduce 來合併它們!
const allClicks$ = observables.reduce(
  (acc, current) => merge(acc, current), // 二元運算:merge
  EMPTY                                  // 單位元素:EMPTY
);

allClicks$.subscribe(() => console.log('A button was clicked!'));

從這裡可看出,Monoid 非常強大,它提供了一種通用的、基於代數理論的模式來聚合多個事件源,而不需要寫任何客製化的、臨時的邏輯。這正是我們學習 Monoid 原因,它讓我們能基於理論來解決真實的程式設計問題。

小結

用幾個要點來回顧一下兩天的文章。

  • RxJS 的核心思想:RxJS 將非同步事件(如點擊、API 回應)視為「隨時間流動的集合」,也就是「串流 (Stream)」。這讓我們能用類似處理陣列的、宣告式的函數式工具來操作它們。
  • 三大設計支柱:RxJS 的設計是 Observer 模式(Push-based 資料推送)、Iterator 模式(值的序列概念)以及 Functional Programming(純函數與組合性)三者的完美融合。
  • 核心三元素:Observable 是定義事件如何產生的懶人藍圖;Observer 是包含 next/error/complete 方法的消費者;Subscription 則代表一次正在執行的訂閱,可以用來取消。
  • Operators:Operators 是 RxJS 組合性的核心工具。它們都是純函數,可透過 .pipe() 方法串連起來,形成一條宣告式的資料處理管線,讓我們能優雅地處理複雜邏輯,而不需手動管理狀態。
  • Observer 與 FP 的連結
    • Observable 是一個 Functor,讓我們能用 map 安全地在非同步的內容中轉換值。
    • Observable 是一個 Monad,讓我們能用 switchMap 等 Flattening Operators,優雅地串連相依的非同步操作,避免巢狀地獄。
    • Observable 具備 Monoid 的特性,讓我們能用 mergeEMPTY,以一種有原則的方式將多個串流合併為一個。

Reference


上一篇
[Day 27] RxJS 簡介 (1)
系列文
30 天的 Functional Programming 之旅28
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言