iT邦幫忙

2017 iT 邦幫忙鐵人賽
DAY 26
7
Modern Web

30 天精通 RxJS系列 第 26

30 天精通 RxJS(25):Subject 總結

本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-25/


30 天精通 RxJS(25):Subject 總結

Subject 其實在 RxJS 中最常被誤解的一部份,因為 Subject 可以讓你用命令式的方式雖送值到一個 observable 的串流中。很多人會直接把這個特性拿來用在 不知道如何建立 Observable 的狀況,比如我們在 30 天精通 RxJS(23) 中提到的可以用在 ReactJS 的 Event 中,來建立 event 的 observable

class MyButton extends React.Component {
    constructor(props) {
        super(props);
        this.state = { count: 0 };
        this.subject = new Rx.Subject();
        
        this.subject
            .mapTo(1)
            .scan((origin, next) => origin + next)
            .subscribe(x => {
                this.setState({ count: x })
            })
    }
    render() {
        return <button onClick={event => this.subject.next(event)}>{this.state.count}</button>
    }
}

因為在 React API 的關係,如果我們想要把 React Event 轉乘 observable 就可以用 Subject 幫我們做到這件事;但絕大多數的情況我們是可以透過 Observable.create 來做到這件事,像下面這樣

const example = Rx.Observable.creator(observer => {
    const source = getSomeSource(); // 某個資料源
    source.addListener('some', (some) => {
        observer.next(some)
    })
});

大概就會像上面這樣,如果沒有合適的 creation operators 我們還是可以利用 Observable.create 來建立 observable,除非真的因為框架限制才會直接用 Subject。

Subject 與 Observable 的差異

永遠記得 Subject 其實是 Observer Design Pattern 的實作,所以當 observer 訂閱到 subject 時,subject 會把訂閱者塞到一份訂閱者清單,在元素發送時就是在遍歷這份清單,並把元素一一送出,這跟 Observable 像是一個 function 執行是完全不同的(請參考 05 篇)。

Subject 之所以具有 Observable 的所有方法,是因為 Subject 繼承了 Observable 的型別,其實 Subject 型別中主要實做的方法只有 next、error、 complete、subscribe 及 unsubscribe 這五個方法,而這五個方法就是依照 Observer Pattern 下去實作的。

總而言之,Subject 是 Observable 的子類別,這個子類別當中用上述的五個方法實作了 Observer Pattern,所以他同時具有 Observable 與 Observer 的特性,而跟 Observable 最大的差異就是 Subject 是具有狀態的,也就是儲存的那份清單!

當前版本會遇到的問題

因為 Subject 在訂閱時,是把 observer 放到一份清單當中,並在元素要送出(next)的時候遍歷這份清單,大概就像下面這樣

//...
next() {
    // observers 是一個陣列存有所有的 observer 
    for (let i = 0; i < observers.length; i++) {
        observers[i].next(value);
    }
}
//...

這會衍伸一個大問題,就是在某個 observer 發生錯誤卻沒有做錯誤處理時,就會影響到別的訂閱,看下面這個例子

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});
subject.subscribe(x => console.log('A', x));
example.subscribe(x => console.log('B', x));
subject.subscribe(x => console.log('C', x));

source.subscribe(subject);

JSBin

上面這個例子,大家可能會預期 B 會在送出 1 的時候掛掉,另外 A 跟 C 則會持續發送元素,確實正常應該像這樣運席;但目前 RxJS 的版本中會在 B 報錯之後,A 跟 C 也同時停止運行。原因就像我前面所提的,在遍歷所有 observer 時發生了例外會導致之後的行為停止。

這個應該會在之後的版本中改掉的,前陣子才在 TC39 Observable proposal 中討論完。

那要如何解決這個問題呢? 目前最簡單的方式當然是盡可能地把所有 observer 的錯誤處理加進去,這樣一來就不會有例外發生

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject();

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});
subject.subscribe(
    x => console.log('A', x),
    error => console.log('A Error:' + error));
example.subscribe(x => console.log('B', x),
    error => console.log('B Error:' + error));
subject.subscribe(x => console.log('C', x),
    error => console.log('C Error:' + error));

source.subscribe(subject);

JSBin

像上面這段程式碼,當 B 發生錯誤時就只有 B 會停止,而不會影響到 A 跟 C。

當然還有另一種解法是用 Scheduler,但因為我們這系列的文章還沒有講到 Scheduler 所以這個解法大家看看就好

const source = Rx.Observable.interval(1000);
const subject = new Rx.Subject().observeOn(Rx.Scheduler.asap);

const example = subject.map(x => {
    if (x === 1) {
        throw new Error('oops');
    }
    return x;
});
subject.subscribe(x => console.log('A', x));
example.subscribe(x => console.log('B', x));
subject.subscribe(x => console.log('C', x));

source.subscribe(subject);

一定需要使用 Subject 的時機?

Subject 必要的使用時機除了本篇文章一開始所提的之外,正常應該是當我們一個 observable 的操作過程中發生了 side-effect 而我們不希望這個 side-effect 因為多個 subscribe 而被觸發多次,比如說下面這段程式碼

var result = Rx.Observable.interval(1000).take(6)
             .map(x => Math.random()); // side-effect,平常有可能是呼叫 API 或其他 side effect

var subA = result.subscribe(x => console.log('A: ' + x));
var subB = result.subscribe(x => console.log('B: ' + x));

JSBin

這段程式碼 A 跟 B 印出來的亂數就不一樣,代表 random(side-effect) 被執行了兩次,這種情況就一定會用到 subject(或其相關的 operators)

var result = Rx.Observable.interval(1000).take(6)
             .map(x => Math.random()) // side-effect
             .multicast(new Rx.Subject())
             .refCount();

var subA = result.subscribe(x => console.log('A: ' + x));
var subB = result.subscribe(x => console.log('B: ' + x));

JSBin

改成這樣後我們就可以讓 side-effect 不會因為訂閱數而多執行,這種情狀就是一定要用 subject 的。

今日小結

今天總結了 Subject 的使用情境,以及釐清跟 Observable 的關係,並且指出在使用時要避免犯發生的錯誤。

這幾點都非常的重要,不知道今天讀者有沒有收穫呢? 如果有任何問題,歡迎在下方留言給我,謝謝!


上一篇
30 天精通 RxJS(24): Observable operators - multicast, refCount, publish, share
下一篇
30 天精通 RxJS(26):簡易實作 Observable(一)
系列文
30 天精通 RxJS30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中
1
l7960261
iT邦新手 5 級 ‧ 2017-01-15 10:18:57

我可以理解為 Subject 是為了在多個訂閱中共用執行結果而存在的嗎 ?

JerryHong iT邦新手 5 級 ‧ 2017-01-15 17:11:18 檢舉

Yes, you got it!

請問 Rx 有辦法做到監聽 animation end 再自行手動 callback 嗎?

多個並行的 animation , 監控最後一個完成的再 callback 執行下一個 Observable

JerryHong iT邦新手 5 級 ‧ 2019-05-09 15:29:45 檢舉

可以喔
把每個 animation 包成 observable 就可以做到

0
JerryHong
iT邦新手 5 級 ‧ 2019-05-09 15:29:52

本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-25/

0
JerryHong
iT邦新手 5 級 ‧ 2019-05-09 15:29:56

本篇文章搬家囉! 這裡不再回覆留言,請移至 https://blog.jerry-hong.com/series/rxjs/thirty-days-RxJS-25/

我要留言

立即登入留言