本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-20/
前幾天我們講完了能把 Higher Order Observable 轉成一般的 Observable 的 operators,今天我們要講能夠把一般的 Observable 轉成 Higher Order Observable 的 operators。
其實前端不太有機會用到這類型的 Operators,都是在比較特殊的需求下才會看到,但還是會有遇到的時候。
window 是一整個家族,總共有五個相關的 operators
這裡我們只介紹 window 跟 windowToggle 這兩個方法,其他三個的用法相對都簡單很多,大家如果有需要可以再自行到官網查看。
window 很類似 buffer 可以把一段時間內送出的元素拆出來,只是 buffer 是把元素拆分到陣列中變成
Observable<T> => Observable<Array<T>>
而 window 則是會把元素拆分出來放到新的 observable 變成
Observable<T> => Observable<Observable<T>>
buffer 是把拆分出來的元素放到陣列並送出陣列;window 是把拆分出來的元素放到 observable 並送出 observable,讓我們來看一個例子
var click = Rx.Observable.fromEvent(document, 'click');
var source = Rx.Observable.interval(1000);
var example = source.window(click);
example
.switch()
.subscribe(console.log);
// 0
// 1
// 2
// 3
// 4
// 5 ...
首先 window 要傳入一個 observable,每當這個 observable 送出元素時,就會把正在處理的 observable 所送出的元素放到新的 observable 中並送出,這裡看 Marble Diagram 會比較好解釋
click : -----------c----------c------------c--
source : ----0----1----2----3----4----5----6---..
window(click)
example: o----------o----------o------------o--
\ \ \
---0----1-|--2----3--|-4----5----6|
switch()
: ----0----1----2----3----4----5----6---...
這裡可以看到 example 變成發送 observable 會在每次 click 事件發送出來後結束,並繼續下一個 observable,這裡我們用 switch 才把它攤平。
當然這個範例只是想單存的表達 window 的作用,沒什麼太大的意義,實務上 window 會搭配其他的 operators 使用,例如我們想計算一秒鐘內觸發了幾次 click 事件
var click = Rx.Observable.fromEvent(document, 'click');
var source = Rx.Observable.interval(1000);
var example = click.window(source)
example
.map(innerObservable => innerObservable.count())
.switch()
.subscribe(console.log);
注意這裡我們把 source 跟 click 對調了,並用到了 observable 的一個方法 count()
,可以用來取得 observable 總共送出了幾個元素,用 Marble Diagram 表示如下
source : ---------0---------1---------2--...
click : --cc---cc----c-c----------------...
window(source)
example: o--------o---------o---------o--..
\ \ \ \
-cc---cc|---c-c---|---------|--..
count()
: o--------o---------o---------o--
\ \ \ \
-------4|--------2|--------0|--..
switch()
: ---------4---------2---------0--...
從 Marble Diagram 中可以看出來,我們把部分元素放到新的 observable 中,就可以利用 Observable 的方法做更靈活的操作
windowToggle 不像 window 只能控制內部 observable 的結束,windowToggle 可以傳入兩個參數,第一個是開始的 observable,第二個是一個 callback 可以回傳一個結束的 observable,讓我們來看範例
var source = Rx.Observable.interval(1000);
var mouseDown = Rx.Observable.fromEvent(document, 'mousedown');
var mouseUp = Rx.Observable.fromEvent(document, 'mouseup');
var example = source
.windowToggle(mouseDown, () => mouseUp)
.switch();
example.subscribe(console.log);
一樣用 Marble Diagram 會比較好解釋
source : ----0----1----2----3----4----5--...
mouseDown: -------D------------------------...
mouseUp : ---------------------------U----...
windowToggle(mouseDown, () => mouseUp)
: -------o-------------------------...
\
-1----2----3----4--|
switch()
example : ---------1----2----3----4---------...
從 Marble Diagram 可以看得出來,我們用 windowToggle 拆分出來內部的 observable 始於 mouseDown 終於 mouseUp。
最後我們來講一個實務上比較常用的 operators - groupBy,它可以幫我們把相同條件的元素拆分成一個 Observable,其實就跟平常在下 SQL 是一樣個概念,我們先來看個簡單的例子
var source = Rx.Observable.interval(300).take(5);
var example = source
.groupBy(x => x % 2);
example.subscribe(console.log);
// GroupObservable { key: 0, ...}
// GroupObservable { key: 1, ...}
上面的例子,我們傳入了一個 callback function 並回傳 groupBy 的條件,就能區分每個元素到不同的 Observable 中,用 Marble Diagram 表示如下
source : ---0---1---2---3---4|
groupBy(x => x % 2)
example: ---o---o------------|
\ \
\ 1-------3----|
0-------2-------4|
在實務上,我們可以拿 groupBy 做完元素的區分後,再對 inner Observable 操作,例如下面這個例子我們將每個人的分數作加總再送出
var people = [
{name: 'Anna', score: 100, subject: 'English'},
{name: 'Anna', score: 90, subject: 'Math'},
{name: 'Anna', score: 96, subject: 'Chinese' },
{name: 'Jerry', score: 80, subject: 'English'},
{name: 'Jerry', score: 100, subject: 'Math'},
{name: 'Jerry', score: 90, subject: 'Chinese' },
];
var source = Rx.Observable.from(people)
.zip(
Rx.Observable.interval(300),
(x, y) => x);
var example = source
.groupBy(person => person.name)
.map(group => group.reduce((acc, curr) => ({
name: curr.name,
score: curr.score + acc.score
})))
.mergeAll();
example.subscribe(console.log);
// { name: "Anna", score: 286 }
// { name: 'Jerry', score: 270 }
這裡我們範例是想把 Jerry 跟 Anna 的分數個別作加總,畫成 Marble Diagram 如下
source : --o--o--o--o--o--o|
groupBy(person => person.name)
: --i--------i------|
\ \
\ o--o--o|
o--o--o--|
map(group => group.reduce(...))
: --i---------i------|
\ \
o| o|
mergeAll()
example: --o---------o------|
今天講了兩個可以把元素拆分到新的 observable 的 operators,這兩個 operators 在前端比較少用到,但在後端或是比較複雜了前端應用才比較有機會用到。不知道讀者有沒有收穫呢? 如果有任何問題歡迎留言給我,謝謝。
buffer 是把拆分出來的元素放到陣列並送出陣列;windwo 是把拆分出來的元素放到 observable 並送出 observable,讓我們來看一個例子
提醒有錯字 => window
感謝! 已修正
请教一下,
operator - groupBy 从marble 图来看,是将source拆分从多个流
那么下面的代码:
const stream1 = Rx.Observable.interval(500).take(5);
stream1.groupBy(i => i%2).concatAll().subscribe(console.log);
为什么输出只有 0,2,4
而不是0,2,4,1,3 ? (根据concatAll()的特点,先输出第一个流0,2,4,再输出1,3)
谢谢
因為 groupBy 之後分出去的 observable 會在原本 observable complete 後也 complete
以這個例子來說就是 0, 2, 4 送出後,這時 stream1 已經結束了
第一個 inner observable(key=0) 結束,接著對下一個 inner observable(key=1) 做訂閱,但其實這個時候 key=1 的 inner observable 也已經結束(complete),所以最後不會再送出任何值
嗯嗯,谢谢题主的回答,确实按输出来说,是这样理解的。但我查看了一下concatAll()的文档,根据文档的描述和marble图来说,concatAll()是会将所有接受到的observable按顺序排列,然后依次从头emit
下图截取至 文档
source一共发出了ab,cd,ef三个流,在ab流结束前,cd流也是同样结束了,但因为有concatAll()存在,它会把三个流排序后依次再射出,因此提前结束的流也会被射出。
这样的话,对于groupBy,
是将source分为1,3,5/ 2,4 两个流,在(1,3,5)的Observable之前,(2,4)的Observable也结束了,如果后面接了.concatAll()的话,照理解应该是也会将 (2,4)排在(1,3,5)后射出才对。
对这一点我觉得比较困惑
谢谢
對 你說的沒錯 但你的例子是從 0 開始
所以會是 0,2,4 再 1,3
借此串問一下,關於 concatAll 因為
"第一個 inner observable(key=0) 結束,接著對下一個 inner observable(key=1) 做訂閱,但其實這個時候 key=1 的 inner observable 也已經結束(complete),所以最後不會再送出任何值"
那麼舉以下的例子來看
var a = 1;
var click = Rx.Observable.fromEvent(window, 'click');
var source = click.map(e => {
console.log('執行map; a= ', a);
obsb = a%2 ?
Rx.Observable.interval(2000).take(3) :
Rx.Observable.interval(300).take(3) ;
a = a + 1;
return obsb ;
});
var example = source.concatAll();
example.subscribe({
next: (value) => { console.log(value); },
error: (err) => { console.log('Error: ' + err); },
complete: () => { console.log('complete'); }
});
當我在很快速點擊 window 兩次情況來看的話
第一次點擊會被轉換成 Rx.Observable.interval(2000).take(3)
第二次點擊會被轉換成 Rx.Observable.interval(300).take(3)
畫出 marble 如下
click: ----------c-c----------------
map
source: ----------o-o----------------
\ \-0-1-2|
\----0----1----2|
第二次點擊被轉換成的 Rx.Observable.interval(300).take(3) 很明顯會先 complete
但為何輸出還會是 0 1 2 0 1 2 ??
(( 預期輸出為 0 1 2
剛進入 RxJS 的世界,還有很多地方尚未上手,麻煩多指教
he.chun
我說的這段
第一個 inner observable(key=0) 結束,接著對下一個 inner observable(key=1) 做訂閱,但其實這個時候 key=1 的 inner observable 也已經結束(complete),所以最後不會再送出任何值
不是指 concatAll
的行為 是在說 groupBy 分出去的 observable
因為 groupBy 之後分出去的 observable 會在原本 observable complete 後也 complete
本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-20/