iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 11
11
Modern Web

30 天精通 RxJS系列 第 11

30 天精通 RxJS (10): Observable Operator - combineLatest, withLatestFrom, zip

本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-10/


30 天精通 RxJS (10): Observable Operator - combineLatest, withLatestFrom, zip

非同步最難的地方在於,當有多個非同步行為同時觸發且相互依賴,這時候我們要處理的邏輯跟狀態就會變得極其複雜,甚至程式碼很可能會在完成的一兩天後就成了 Legacy Code。

昨天我們最後講到了 merge 的用法,它的邏輯就像是 OR(||)一樣,可以把多個 observable 合併且同時處理,當其中任合一個 observable 送出元素時,我們都做相同的處理。

今天我們要講的三個 operators 則像是 AND(&&) 邏輯,它們都是在多個元素送進來時,只輸出一個新元素,但各自的行為上仍有差異,需要讀者花點時間思考,建議在頭腦清醒時閱讀本篇文章。

Operators

combineLatest

首先我們要介紹的是 combineLatest,它會取得各個 observable 最後送出的值,再輸出成一個值,我們直接看範例會比較好解釋。

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.combineLatest(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// complete

JSBin | JSFiddle

大家第一次看到這個 output 應該都會很困惑,我們直接來看 Marble Diagram 吧!

source : ----0----1----2|
newest : --0--1--2--3--4--5|

    combineLatest(newest, (x, y) => x + y);

example: ----01--23-4--(56)--7|

首先 combineLatest 可以接收多個 observable,最後一個參數是 callback function,這個 callback function 接收的參數數量跟合併的 observable 數量相同,依照範例來說,因為我們這裡合併了兩個 observable 所以後面的 callback function 就接收 x, y 兩個參數,x 會接收從 source 發送出來的值,y 會接收從 newest 發送出來的值。

最後一個重點就是一定會等兩個 observable 都曾有送值出來才會呼叫我們傳入的 callback,所以這段程式是這樣運行的

  • newest 送出了 0,但此時 source 並沒有送出過任何值,所以不會執行 callback
  • source 送出了 0,此時 newest 最後一次送出的值為 0,把這兩個數傳入 callback 得到 0
  • newest 送出了 1,此時 source 最後一次送出的值為 0,把這兩個數傳入 callback 得到 1
  • newest 送出了 2,此時 source 最後一次送出的值為 0,把這兩個數傳入 callback 得到 2
  • source 送出了 1,此時 newest 最後一次送出的值為 2,把這兩個數傳入 callback 得到 3
  • newest 送出了 3,此時 source 最後一次送出的值為 1,把這兩個數傳入 callback 得到 4
  • source 送出了 2,此時 newest 最後一次送出的值為 3,把這兩個數傳入 callback 得到 5
  • source 結束,但 newest 還沒結束,所以 example 還不會結束。
  • newest 送出了 4,此時 source 最後一次送出的值為 2,把這兩個數傳入 callback 得到 6
  • newest 送出了 5,此時 source 最後一次送出的值為 2,把這兩個數傳入 callback 得到 7
  • newest 結束,因為 source 也結束了,所以 example 結束。

不管是 source 還是 newest 送出值來,只要另一方曾有送出過值(有最後的值),就會執行 callback 並送出新的值,這就是 combineLatest。

combineLatest 很常用在運算多個因子的結果,例如最常見的 BMI 計算,我們身高變動時就拿上一次的體重計算新的 BMI,當體重變動時則拿上一次的身高計算 BMI,這就很適合用 combineLatest 來處理!

zip

在講 withLatestFrom 之前,先讓我們先來看一下 zip 是怎麼運作的,zip 會取每個 observable 相同順位的元素並傳入 callback,也就是說每個 observable 的第 n 個元素會一起被傳入 callback,這裡我們同樣直接用範例講解會比較清楚

var source = Rx.Observable.interval(500).take(3);
var newest = Rx.Observable.interval(300).take(6);

var example = source.zip(newest, (x, y) => x + y);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 2
// 4
// complete

JSBin | JSFiddle

Marble Diagram 長這樣

source : ----0----1----2|
newest : --0--1--2--3--4--5|
    zip(newest, (x, y) => x + y)
example: ----0----2----4|

以我們的範例來說,zip 會等到 source 跟 newest 都送出了第一個元素,再傳入 callback,下次則等到 source 跟 newest 都送出了第二個元素再一起傳入 callback,所以運行的步驟如下:

  • newest 送出了第一個0,但此時 source 並沒有送出第一個值,所以不會執行 callback。
  • source 送出了第一個0,newest 之前送出的第一個值為 0,把這兩個數傳入 callback 得到 0
  • newest 送出了第二個1,但此時 source 並沒有送出第二個值,所以不會執行 callback。
  • newest 送出了第三個2,但此時 source 並沒有送出第三個值,所以不會執行 callback。
  • source 送出了第二個1,newest 之前送出的第二個值為 1,把這兩個數傳入 callback 得到 2
  • newest 送出了第四個3,但此時 source 並沒有送出第四個值,所以不會執行 callback。
  • source 送出了第三個2,newest 之前送出的第三個值為 2,把這兩個數傳入 callback 得到 4
  • source 結束 example 就直接結束,因為 source 跟 newest 不會再有對應順位的值

zip 會把各個 observable 相同順位送出的值傳入 callback,這很常拿來做 demo 使用,比如我們想要間隔 100ms 送出 'h', 'e', 'l', 'l', 'o',就可以這麼做

var source = Rx.Observable.from('hello');
var source2 = Rx.Observable.interval(100);

var example = source.zip(source2, (x, y) => x);

這裡的 Marble Diagram 就很簡單

source : (hello)|
source2: -0-1-2-3-4-...
        zip(source2, (x, y) => x)
example: -h-e-l-l-o|

這裡我們利用 zip 來達到原本只能同步送出的資料變成了非同步的,很適合用在建立示範用的資料。

建議大家平常沒事不要亂用 zip,除非真的需要。因為 zip 必須 cache 住還沒處理的元素,當我們兩個 observable 一個很快一個很慢時,就會 cache 非常多的元素,等待比較慢的那個 observable。這很有可能造成記憶體相關的問題!

withLatestFrom

withLatestFrom 運作方式跟 combineLatest 有點像,只是他有主從的關係,只有在主要的 observable 送出新的值時,才會執行 callback,附隨的 observable 只是在背景下運作。讓我們看一個例子

var main = Rx.Observable.from('hello').zip(Rx.Observable.interval(500), (x, y) => x);
var some = Rx.Observable.from([0,1,0,0,0,1]).zip(Rx.Observable.interval(300), (x, y) => x);

var example = main.withLatestFrom(some, (x, y) => {
    return y === 1 ? x.toUpperCase() : x;
});

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

JSBin | JSFiddle

先看一下 Marble Diagram

main   : ----h----e----l----l----o|
some   : --0--1--0--0--0--1|

withLatestFrom(some, (x, y) =>  y === 1 ? x.toUpperCase() : x);

example: ----h----e----l----L----O|

withLatestFrom 會在 main 送出值的時候執行 callback,但請注意如果 main 送出值時 some 之前沒有送出過任何值 callback 仍然不會執行!

這裡我們在 main 送出值時,去判斷 some 最後一次送的值是不是 1 來決定是否要切換大小寫,執行步驟如下

  • main 送出了 h,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 h
  • main 送出了 e,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 e
  • main 送出了 l,此時 some 上一次送出的值為 0,把這兩個參數傳入 callback 得到 l
  • main 送出了 l,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 L
  • main 送出了 o,此時 some 上一次送出的值為 1,把這兩個參數傳入 callback 得到 O

withLatestFrom 很常用在一些 checkbox 型的功能,例如說一個編輯器,我們開啟粗體後,打出來的字就都要變粗體,粗體就像是 some observable,而我們打字就是 main observable。

今日小結

今天介紹了三個合併用的 operators,這三個 operators 的 callback 都會依照合併的 observable 數量來傳入參數,如果我們合併了三個 observable,callback 就會有三個參數,而不管合併幾個 observable 都會只會回傳一個值。

這幾個 operators 需要花比較多的時間思考,讀者們不用硬記他的運作行為,只要稍微記得有這些 operators 可以用就可以了。等到真的要用時,再重新回來看他們的運作方式做選擇。

不知道讀者們今天有沒有收穫呢? 如果有任何問題,歡迎在下方留言給我,謝謝!


上一篇
30 天精通 RxJS (09): Observable Operator - skip, takeLast, last, concat, startWith, merge
下一篇
30 天精通 RxJS (11): 實務範例 - 完整拖拉應用
系列文
30 天精通 RxJS30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中
1
suda
iT邦新手 3 級 ‧ 2017-02-25 19:26:44

太棒了,真是感謝你的撰文.

JerryHong iT邦新手 5 級 ‧ 2017-02-27 01:42:45 檢舉

謝謝你喜歡^^

0
connie107
iT邦新手 5 級 ‧ 2017-07-24 20:37:48

有點複雜

0
indianazhao
iT邦新手 5 級 ‧ 2017-12-07 11:43:13

zip 搭配 from 和 interval 用來每隔一段時間發 request,真的很方便!
隨著在實作中導入 rxjs,慢慢感受到它的威力...
真的很謝謝您這個系列文,太棒了~

JerryHong iT邦新手 5 級 ‧ 2018-01-04 10:24:55 檢舉

謝謝^^

0
hadernerstern
iT邦新手 5 級 ‧ 2018-01-08 18:46:28

withLatestFrom 那一节有个小点需要注意的是,
如果main stream提前有值被emit了,此时sub-stream还没有值(不曾emit过),那个这个main值会被忽略掉,不会进入call back function。

let subStream = Rx.Observable.from([0, 1, 0, 0, 0, 1]).zip(Rx.Observable.interval(500), (x, y) => x);
let mainStream = Rx.Observable.from('hello').zip(Rx.Observable.interval(300), (x, y) => x);
mainStream.withLatestFrom(subStream, (x, y) => {
  console.log(x);
  return y === 1 ? x.toUpperCase() : x
}).subscribe(observer1);

Output: ee-ll-lL-oo

main stream 的 'h',会在300ms被emit, 此时sub stream还没有emit值,callback function中,h值不会被catch到。

JerryHong iT邦新手 5 級 ‧ 2018-01-09 01:34:46 檢舉

我在文章中有提到

withLatestFrom 會在 main 送出值的時候執行 callback,但請注意如果 main 送出值時 some 之前沒有送出過任何值 callback 仍然不會執行!

Ok. /images/emoticon/emoticon41.gif

1
mattchen0512
iT邦新手 5 級 ‧ 2019-01-18 18:18:11

經典好文,拆解流程講解非常詳細.

0
JerryHong
iT邦新手 5 級 ‧ 2019-05-09 15:16:15

本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-10/

我要留言

立即登入留言