今天寫 ng conf 2019 另一篇,感覺也是跟 custom operator 有關的
ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51
開頭先由Tracy Lee介紹一些新東西
import { fromFetch } from 'rxjs/fetch';
import { mergeMap } from 'rxjs/operators';
fromFetch('/url/data.json').pipe(
mergeMap(res=>res.json())
) ^^^^^^ 內建的
.subscribe(data=>console.log(data));
使用dictionaries of observable去得到objects stream
從observables,用name去取值
import { forkJoin,of} from 'rxjs';
forkJoin({
foo: of('value'),
bar: of(123),
})
.subscribe(x=>console.log(x));
// { foo: "value, bar: 123 }
import {scheduled, asapScheduler} from 'rxjs';
可以把array傳給scheduler
scheduled([1,2,3], asapScheduler)
.subscribe(x=>console.log(x));
超棒的網站
再來由 Ben Lesh 進入正題
RxJS常常會串太長,例如:
(初學者寫成這樣就蠻棒的了,可以參考)
effects$=this.action$.pipe(
concatMap(action=>{
case 'LOAD':
return this.http.get('/users').pipe(
catchError(err=>{
console.warn('User load error:'+err.message);
return EMPTY;
}),
concatMap(users=>{
return forkJoin(
users.map(user=>this.http.get('/users/${user.id}/profile').pipe(
catchError(err=>{
console.ware('Profile load error:'+err.message);
return EMPTY;
})
))
).pipe(
map(profiles=>users.map((user,i)=>{
{user,profile:profiles[i]}
}))
);
});
case 'DELETE':
return this.http.get('/users/${action.payload}').pipe(
catchError(err=>{
console.warn('User delete error:'+err.message);
return EMPTY;
}),
...
對高手來說,這麼一長串是不好維護的,所以要怎麼改善呢?
// 所以每個operator的回傳型別都長這樣
(source:Observable<A>)=>Observable<B>
Observalbe<A>.pipe(
(Observable<A>)=>Observable<B>,
(Observable<B>)=>Observable<C>,
(Observable<C>)=>Observable<D>,
); // returns Observable<D>
map =
(fn:(A)=>(b)=>
(Observable<A>)=>Observable<B>
用一個Higher-Order Function
自訂一個takeEveryNth()
import {filter} from 'rxjs/operators';
emits every nth value(從上游來的Observable資料流stream)
@param n the count to emit on
export function takeEveryNth(n:number){
VVVVVV 不是重點
return filter( (_,i) => i%n ===0 );
^^^^^^^^^^^^^^^^^ Higher-Order Function
}
使用takeEveryNth()
import {of} from 'rxjs';
import {takeEveryNth} from './take-every-nth';
of(0,1,2,3,4,5,6,7,8,9).pipe(
takeEveryNth(3)
)
.subscribe(x=>console.log(x))
// Emits: 0,3,6,9
相對比之下,後者有更高的可讀性,且可重複使用
filter((_,i)=>i%3===0) vs takeEveryNth(3)
除此之外,也可有利於測試
// 使用一個假的Observable
it('should take every nth value from an observable',()=>{
const results=[];
of(0,1,2,3,4,5,6,7)
.pipe(takeEveryNth(3))
.subscribe(x=>results.push(x));
expect(results).toEqual([0,3,6]);
});
// 使用Subject
it('should take every nth value',()=>{
const results=[];
const source=new Subject<number>();
source.pipe(takeEveryNth(3))
.subscribe(x=>results.push(x));
source.next(0);
expect(results).toEqual([0]);
source.next(1);
source.next(2);
source.next(3);
expect(results).toEqual([0,3]);
});
data$=this.loadDataClicked$.pipe(
switchMap(()=>
this.http.get('https://api.github.com/users?per_page=5').pipe(
// 散佈各地的error handling,可以抽出來
catchError(err=>{
console.log('Error:'+err.message);
return EMPTY;
})
// 就可以變成logError();
)
)
)
extract the error handling
// Handles an error by logging to console.
export function logError(){
return catchError(err=>{
console.log('Error:'+err.message);
return EMPTY;
})
}
可以再往上抽一層,改寫http get request(有error handling)
// Takes any notification and triggers a new HTTP GET request
function toHttpGetLatest(http:HttpClient,url:string){
return switchMap(()=>
http.get(url).pipe(
logError(),
)
)
}
改寫後,可讀性更高。對於測試,也可好抽離WebAPI,換成假資料
data$=this.loadDataCliecked$.pipe(
toHttpGetLatest(this.http, 'https://api.github.com/users?per_page=5')
)
甚至,針對某Web API寫一個function
V 預設值
export function getGithubUsers(http:HttpClient,{pageSize}={pageSize:5}){
return toHttpGetLatest(http,'https://api.github.com/users?per_page=${pageSize}`);
^^^^^^^^^^^
}
最簡單的,就是一個function形式
// T 進來、R 回傳 VVVVVV 這個function傳入參數為T
function map<T,R>(fn:(value:T)=>R){
^^^^^^^^^^^^^^^ function
// 接收一個Observable,回傳一個新的Observable
return (source:Observable<T>)=>new Observable<R>(
// inside of that initalization function
// pass source:Observable<T>的constructor
subscriber=>{
// 要去subscribe source observable
return source.subscribe({
// 新的observer,去實作map()的next,error,complete
next:v=>subscriber.next(fn(v)),
^^ user丟進來的function要放哪?
error:e=>subscriber.error(e),
complete:()=>subscriber.complete(),
});
});
);
}
上面fn,是使用者丟進來的function(user-provided function)
可以加一個try...catch
next:v=>{
let result:R;
try{
result=fn(v); //
}catch(err){
subscriber.error(err);
return;
}
subscriber.next(result); //
},
這邊Ben Lesh說明蠻多的,想了解原因的可以看影片
Observable.prototype.lift的介紹
看Observable裡,lift()的原始碼
class Observable<T>{
lift(operator:Operator<T,R>){
const result=new Observable<R>();
result.operator=operator;
result.source=this; // this就是呼叫我的那個物件
return result;
}
}
建議要看這篇別太在意包裝上的照片與實物之間的差異:RxJS
https://jonny-huang.github.io/angular/training/30_rxjs_5/
打包機:lift
回去查閱 Observable 內的 lift 方法,可以看到 lift 會建立一個新的 Observable 來包裝目前的 Observable 與 Operator 物件。
30 天精通 RxJS (07)
https://ithelp.ithome.com.tw/articles/10187248
在 RxJS 5 的實作中,其實每個 operator 是透過原來 observable 的 lift 方法來建立新的 observable,這個方法會在新回傳的 observable 物件內偷塞兩個屬性,分別是 source 與 operator,記錄原本的資料源跟當前使用的 operator。
其實 lift 方法還是用 new Observable(跟 create 一樣)。至於為什麼要獨立出這個方法,除了更好的封裝以外,主要的原因是為了讓 RxJS 5 的使用者能更好的 debug。關於 RxJS 5 的除錯方式,我們會專門寫一篇來講解!
我找不到OperatorApplicator是什麼東東,知道的大大歡迎留言告知
interface OperatorApplicator<T,R>{
VVVV method
call(
subscriber: Subscriber<R>, source: Observable<T>
):TeardownLogic;
}
// 可以寫成function
operatorApplicator.call(subscriber,source)
function operatorApplicator(this:Subscriber,source:Observable){}
class MapOperatorAppl<T,R> implements OperatorApplicator<T,R>{
constructor(private fn:(value:T)=>R){}
call(subscriber:Subscriber<R>,source:Observable<T>){
VVVVVVVVV呼叫 Observable 的訂閱(subscribe)
return source.subscribe(
^^^^^^^^^ subscribe 只接收 Observer
// 實作 Observer 介面
{
next:v=>subscriber.next(this.fn(v)),
error:e=>subscriber.error(e),
complete:()=>subscriber.complete(),
}
);
}
}
// RxJS 程式內的 Operator 物件與我們平常講的 Operator Function 是不一樣的東西
// 寫成function
export function map<T,R>(fn:(value:T)=>R){
return (source:Observable<T>)=>source.lift(mapOperator(fn));
^^^^建立新的Observable
}
function mapOperator(fn:(value:T)=>R){
return function mapLifted(this:Subscriber<R>,source:Observable<T>){
// 都長得很像
return source.subscribe({
// next:v=>this.next(fn(v)),
// 加上error handling
next:v=>{
let result:R;
try{result=fn(v);}catch(e){this.error(e);return;}
this.next(r);
}
error:e=>this.error(e),
complete:()=>this.complete(),
})
}
}
@benlesh 介紹marble tests,並示範一些operator怎麼寫單元測試
但ben lesh建議不要過度時用 marble tests
Two types of operators to test
Kevin大大的文章都是精華呀!!
https://blog.kevinyang.net/2018/05/18/angualr-testing-delay/
RxJS 6 版以後,提供了一個 TestScheduler 可以讓我們來做 Observable 的測試
(測試計畫、測試排程器?)
import { TestScheduler } from 'rxjs/testing';
let rxTest:TestScheduler;
beforeEach(()=>{
rxTest=new TestScheduler(assertDeepEquals);
});
function assertDeepEquals(a:any,b:any){
expect(a).toEqual(b);
}
it('should map values',()=>{
rxTest.run(({cold, expectObservable})=>{
// Schedule a bunch of things on the test scheduler
// 在test scheduler做一些安排
}); // <-- Flush the test scheduler
});
it('should map values',()=>{
rxTest.run(({cold,expectObservable})=>{
Each letter (or number) is an emission
of that character as a string*(e.g. "a")
V V complete notification
const src=cold('---a---a---a--|');
^^^^ subscribed才會送資料流
This test observable is "cold",
meaning it's not active until subscribed to.
(there's alse 'hot',but that's not important right now)
const result='---b---b---b--|';
^^^^^^ 只是期望的結果,不是observable
expectObservable(
// Sets this observable up to be run in the TestScheduler at time `0`
src.pipe(map(a=>'b'))
).toBe(result);
});
});
// 如果遇到時間相關的,例如delay
it('should test delay,()=>{
rxTest.run(({cold,time,expectObservable})=>{
const src=cold('--a------b------c-----|');
const t=time('----|'); // 假設一開始都會停4隔時間
const result='-------a------b------c-----|';
expectObservable(src.pipe(delay(t,rxTest)).toBe(result);
^^^^^^^^^^^^^^^
});
});
it('should test takeUntil',()=>{
rxTest.run(({cold,time,expectObservable})=>{
// 要複製到筆記本上,才能看出圖
const src=cold('---a------b------c-----|');
const notifier=cold('------------x----|');
const result='---a------b-|'; // 先把結果推算出來
expectObservable(
src.pipe(takeUntil(notifier))
^^^^^^^^^^^^^^^^^^^
).toBe(result);
});
});
it('should map values',()=>{
rxTest.run(({cold,time,expectObservable})=>{
VVVVVV You can pass in a values lookup
const values={a:200,b:40000}; VVVVVV 可以丟值進去
const src=cold('---a---a---a--|',values);
const result=' ---b---b---b--|'; // 先把結果推算出來
expectObservable(
src.pipe(map(a=>a*a))
VVVVVV
).toBe(result,values);
});
});
When to use marble tests