iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 28
1
Modern Web

Angular新手村學習筆記(2019)系列 第 28

Day28_RxJS Custom Operators(3/3)(ng conf 2019)

  • 分享至 

  • xImage
  •  

今天寫 ng conf 2019 另一篇,感覺也是跟 custom operator 有關的

ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51

開頭先由Tracy Lee介紹一些新東西

rxjs@6.5.1 的新功能

fromFetch

import { fromFetch } from 'rxjs/fetch';
import { mergeMap } from 'rxjs/operators';

fromFetch('/url/data.json').pipe(
    mergeMap(res=>res.json())
)                     ^^^^^^ 內建的
.subscribe(data=>console.log(data));

forkJoin(object)

使用dictionaries of observable去得到objects stream
從observables,用name去取值

import { forkJoin,of} from 'rxjs';

forkJoin({
    foo: of('value'),
    bar: of(123),
})
.subscribe(x=>console.log(x));
// { foo: "value, bar: 123 }

scheduled()

import {scheduled, asapScheduler} from 'rxjs';

可以把array傳給scheduler
scheduled([1,2,3], asapScheduler)
    .subscribe(x=>console.log(x));

超棒的網站

https://rxjs.dev/

再來由 Ben Lesh 進入正題

How To Build Your Own RxJS Operators

RxJS常常會串太長,例如:
(初學者寫成這樣就蠻棒的了,可以參考)

effects$=this.action$.pipe(
    concatMap(action=>{
        case 'LOAD':
            return this.http.get('/users').pipe(
                catchError(err=>{
                    console.warn('User load error:'+err.message);
                    return EMPTY;
                }),
                concatMap(users=>{
                    return forkJoin(
                        users.map(user=>this.http.get('/users/${user.id}/profile').pipe(
                            catchError(err=>{
                                console.ware('Profile load error:'+err.message);
                                return EMPTY;
                            })
                        ))
                    ).pipe(
                        map(profiles=>users.map((user,i)=>{
                            {user,profile:profiles[i]}
                        }))
                    );
                });
        case 'DELETE':
            return this.http.get('/users/${action.payload}').pipe(
                catchError(err=>{
                    console.warn('User delete error:'+err.message);
                    return EMPTY;
                }),
...

對高手來說,這麼一長串是不好維護的,所以要怎麼改善呢?

  • What is an operator?
    It transforms one Observable into a different Observable
// 所以每個operator的回傳型別都長這樣
(source:Observable<A>)=>Observable<B>

piping(Pseudo-code)

Observalbe<A>.pipe(
    (Observable<A>)=>Observable<B>,
    (Observable<B>)=>Observable<C>,
    (Observable<C>)=>Observable<D>,
); // returns Observable<D>

map operator(Pseudo-code)

map =
    (fn:(A)=>(b)=>
    (Observable<A>)=>Observable<B>

最簡單的custom operator形式

用一個Higher-Order Function

自訂一個takeEveryNth()

import {filter} from 'rxjs/operators';

emits every nth value(從上游來的Observable資料流stream)
@param n the count to emit on

export function takeEveryNth(n:number){
           VVVVVV 不是重點
    return filter( (_,i) => i%n ===0 );
                   ^^^^^^^^^^^^^^^^^ Higher-Order Function
}

使用takeEveryNth()

import {of} from 'rxjs';
import {takeEveryNth} from './take-every-nth';

of(0,1,2,3,4,5,6,7,8,9).pipe(
    takeEveryNth(3)
)
.subscribe(x=>console.log(x))
// Emits: 0,3,6,9

相對比之下,後者有更高的可讀性,且可重複使用
filter((_,i)=>i%3===0) vs takeEveryNth(3)

除此之外,也可有利於測試

  • Test easily against "mock" observables
// 使用一個假的Observable
it('should take every nth value from an observable',()=>{
    const results=[];
    of(0,1,2,3,4,5,6,7)
        .pipe(takeEveryNth(3))
        .subscribe(x=>results.push(x));
    expect(results).toEqual([0,3,6]);
});

// 使用Subject
it('should take every nth value',()=>{
    const results=[];
    const source=new Subject<number>();
    source.pipe(takeEveryNth(3))
        .subscribe(x=>results.push(x));
    source.next(0);
    expect(results).toEqual([0]);
    source.next(1);
    source.next(2);
    source.next(3);
    expect(results).toEqual([0,3]);
});

一個常見的問題 - 散佈各地的error handling

data$=this.loadDataClicked$.pipe(
    switchMap(()=>
        this.http.get('https://api.github.com/users?per_page=5').pipe(
            // 散佈各地的error handling,可以抽出來
            catchError(err=>{
                console.log('Error:'+err.message);
                return EMPTY;
            })
            
            // 就可以變成logError();
        )
    )
)

extract the error handling

// Handles an error by logging to console.
export function logError(){
    return catchError(err=>{
        console.log('Error:'+err.message);
        return EMPTY;
    })
}

可以再往上抽一層,改寫http get request(有error handling)

// Takes any notification and triggers a new HTTP GET request
function toHttpGetLatest(http:HttpClient,url:string){
    return switchMap(()=>
        http.get(url).pipe(
            logError(),
        )
    )
}

改寫後,可讀性更高。對於測試,也可好抽離WebAPI,換成假資料

data$=this.loadDataCliecked$.pipe(
    toHttpGetLatest(this.http, 'https://api.github.com/users?per_page=5')
)

甚至,針對某Web API寫一個function

                                                                    V 預設值
export function getGithubUsers(http:HttpClient,{pageSize}={pageSize:5}){
    return toHttpGetLatest(http,'https://api.github.com/users?per_page=${pageSize}`);
                                                                       ^^^^^^^^^^^
}

But what happens when I can't use existing operators to build what I need?

  • Build An Operator From Scratch

Basic DYI map operator

最簡單的,就是一個function形式

// T 進來、R 回傳       VVVVVV 這個function傳入參數為T
function map<T,R>(fn:(value:T)=>R){
                  ^^^^^^^^^^^^^^^ function
    // 接收一個Observable,回傳一個新的Observable
    return (source:Observable<T>)=>new Observable<R>(
    
    // inside of that initalization function
    // pass source:Observable<T>的constructor
    subscriber=>{
        // 要去subscribe source observable
        return source.subscribe({
            // 新的observer,去實作map()的next,error,complete
            next:v=>subscriber.next(fn(v)),
                                    ^^ user丟進來的function要放哪?
            error:e=>subscriber.error(e),
            complete:()=>subscriber.complete(),
        });
    });
    
    );
}

上面fn,是使用者丟進來的function(user-provided function)
可以加一個try...catch

next:v=>{
    let result:R;
    try{
        result=fn(v); // 
    }catch(err){
        subscriber.error(err);
        return;
    }
    subscriber.next(result); // 
},

custom operator不要用Observable.prototype.lift

這邊Ben Lesh說明蠻多的,想了解原因的可以看影片

Observable.prototype.lift的介紹

  • A unified call all operators in RxJS library make during setup.
  • Responsible for creating the resulting Observable instance
  • Allows RxJS to identify the beginning and end of operator chains
  • Allows custom Observable types to flow through operations

看Observable裡,lift()的原始碼

class Observable<T>{
    lift(operator:Operator<T,R>){
        const result=new Observable<R>();
        result.operator=operator;
        result.source=this; // this就是呼叫我的那個物件
        return result;
    }
}

有關 lift 網路文章

建議要看這篇
別太在意包裝上的照片與實物之間的差異:RxJS
https://jonny-huang.github.io/angular/training/30_rxjs_5/

打包機:lift
回去查閱 Observable 內的 lift 方法,可以看到 lift 會建立一個新的 Observable 來包裝目前的 Observable 與 Operator 物件。

30 天精通 RxJS (07)
https://ithelp.ithome.com.tw/articles/10187248

在 RxJS 5 的實作中,其實每個 operator 是透過原來 observable 的 lift 方法來建立新的 observable,這個方法會在新回傳的 observable 物件內偷塞兩個屬性,分別是 source 與 operator,記錄原本的資料源跟當前使用的 operator。

其實 lift 方法還是用 new Observable(跟 create 一樣)。至於為什麼要獨立出這個方法,除了更好的封裝以外,主要的原因是為了讓 RxJS 5 的使用者能更好的 debug。關於 RxJS 5 的除錯方式,我們會專門寫一篇來講解!

OperatorApplicator(塗藥器?)

我找不到OperatorApplicator是什麼東東,知道的大大歡迎留言告知

interface OperatorApplicator<T,R>{
    VVVV method
    call(
        subscriber: Subscriber<R>, source: Observable<T>
    ):TeardownLogic;
}
// 可以寫成function
operatorApplicator.call(subscriber,source)
function operatorApplicator(this:Subscriber,source:Observable){}
class MapOperatorAppl<T,R> implements OperatorApplicator<T,R>{
    constructor(private fn:(value:T)=>R){}
    
    call(subscriber:Subscriber<R>,source:Observable<T>){
                      VVVVVVVVV呼叫 Observable 的訂閱(subscribe)
        return source.subscribe(
                      ^^^^^^^^^ subscribe 只接收 Observer
        // 實作 Observer 介面
        {             
            next:v=>subscriber.next(this.fn(v)),
            error:e=>subscriber.error(e),
            complete:()=>subscriber.complete(),
        }
        );
    }
}

// RxJS 程式內的 Operator 物件與我們平常講的 Operator Function 是不一樣的東西

// 寫成function
export function map<T,R>(fn:(value:T)=>R){
    return (source:Observable<T>)=>source.lift(mapOperator(fn));
                                          ^^^^建立新的Observable
}

function mapOperator(fn:(value:T)=>R){
    return function mapLifted(this:Subscriber<R>,source:Observable<T>){
        // 都長得很像
        return source.subscribe({
            // next:v=>this.next(fn(v)),
            // 加上error handling
            next:v=>{
                let result:R;
                try{result=fn(v);}catch(e){this.error(e);return;}
                this.next(r);
            }
            error:e=>this.error(e),
            complete:()=>this.complete(),
        })
    }
}

marble tests

@benlesh 介紹marble tests,並示範一些operator怎麼寫單元測試
但ben lesh建議不要過度時用 marble tests

Two types of operators to test

  • Operators for a specific function in your App
  1. Test within reason
  2. Test edge cases for your app
  3. Integration tests should cover most things in your app
  4. (Operator tests are unit tests)
  • Operators for general consumption by any RxJS user
  1. Sync observable
  2. Async observable
  3. Empty observable
  4. Never observable
  5. Sync error
  6. Async error
  7. Values then errors
  8. Errors in callbacks
  9. Test the values passed to callbacks
  10. Test retrying the result
  11. Test repeating the result
  12. Tested "hot" observable
  13. Tested "cold" observable
  14. Test every combination if more than one Observable is required

Kevin大大的文章都是精華呀!!
https://blog.kevinyang.net/2018/05/18/angualr-testing-delay/

RxJS 6 版以後,提供了一個 TestScheduler 可以讓我們來做 Observable 的測試
(測試計畫、測試排程器?)

import { TestScheduler } from 'rxjs/testing';

let rxTest:TestScheduler;

beforeEach(()=>{
    rxTest=new TestScheduler(assertDeepEquals);
});

function assertDeepEquals(a:any,b:any){
    expect(a).toEqual(b);
}

it('should map values',()=>{
    rxTest.run(({cold, expectObservable})=>{
        // Schedule a bunch of things on the test scheduler
        // 在test scheduler做一些安排
    }); // <-- Flush the test scheduler
});

it('should map values',()=>{
    rxTest.run(({cold,expectObservable})=>{
Each letter (or number) is an emission 
of that character as a string*(e.g. "a")
                           V          V complete notification
        const src=cold('---a---a---a--|');
                  ^^^^ subscribed才會送資料流
This test observable is "cold",
meaning it's not active until subscribed to.
(there's alse 'hot',but that's not important right now)
        const result='---b---b---b--|';
              ^^^^^^ 只是期望的結果,不是observable
              
        expectObservable(
        // Sets this observable up to be run in the TestScheduler at time `0`
            src.pipe(map(a=>'b'))
        ).toBe(result);
    });
});

// 如果遇到時間相關的,例如delay
it('should test delay,()=>{
    rxTest.run(({cold,time,expectObservable})=>{
        const src=cold('--a------b------c-----|');
        const t=time('----|'); // 假設一開始都會停4隔時間
        const result='-------a------b------c-----|';
        expectObservable(src.pipe(delay(t,rxTest)).toBe(result);
                                  ^^^^^^^^^^^^^^^
        
    });
});

it('should test takeUntil',()=>{
    rxTest.run(({cold,time,expectObservable})=>{
        // 要複製到筆記本上,才能看出圖
        const src=cold('---a------b------c-----|');
        const notifier=cold('------------x----|');
        const result='---a------b-|'; // 先把結果推算出來
        
        expectObservable(
            src.pipe(takeUntil(notifier))
                     ^^^^^^^^^^^^^^^^^^^
        ).toBe(result);
    });
});

it('should map values',()=>{
    rxTest.run(({cold,time,expectObservable})=>{
              VVVVVV You can pass in a values lookup 
        const values={a:200,b:40000};    VVVVVV 可以丟值進去
        const src=cold('---a---a---a--|',values);
        const result='  ---b---b---b--|'; // 先把結果推算出來
        
        expectObservable(
            src.pipe(map(a=>a*a))
                      VVVVVV
        ).toBe(result,values);
    });
});

When to use marble tests

  • Testing operators
  • Testing event coordination 事件間的
  • Testing time-related functionality 時間相關的

上一篇
Day27_RxJS Custom Operators(2/3)(ng conf 2018)
下一篇
Day29_Data Composition with RxJS(ng conf 2019)
系列文
Angular新手村學習筆記(2019)33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言