後來在youtube看到ng conf的影片,覺得有相關
所以RxJS Custom Operators分成3天PO
明天、後天分別是:
延申閱讀
ng conf 2018
Use the Custom Operator Force; Become an RxJS Jedi - Ryan Chenkie
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s
https://github.com/chenkie?tab=repositories
ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51
[S05E08]VSCode基本介紹
https://www.youtube.com/watch?v=kEllE8sZtuE&list=PL9LUW6O9WZqgUMHwDsKQf3prtqVvjGZ6S&index=15
VSCode與測試無關,就跳過囉
文件非常完整,基本上取自己需要的部分設定即可
https://hackmd.io/3uNcseF9TfeiEa0q-VDQXQ
本集的程式碼
https://github.com/chenkie/custom-operators-workshop
這一集由Kevin大大講解
[S05E09] RxJS Custom Operators
https://www.youtube.com/watch?v=HdyTkaD46ls&list=PL9LUW6O9WZqgUMHwDsKQf3prtqVvjGZ6S&index=14
這一集只有14分鐘,
就是講2篇,Kevin大大自己在blog裡寫的文章
雖說只有2篇文章,不過感覺是比較進階的內容了
可見其他文章都非常值得看(只是沒人講解可能深了一點)
https://blog.kevinyang.net/
Keive大大的程式
https://stackblitz.com/edit/typescript-g6f86y
以下就針對index.ts裡,14分鐘有講到的部分註解
(整段copy出來也是避免以後程式不見,還能參考)
import { from, fromEvent, timer, interval, Observable, of } from 'rxjs';
import { tap, take, mergeMap, bufferTime, map, throttleTime, windowTime, sampleTime, auditTime, toArray } from 'rxjs/operators';
console.clear();
/*========================================================
練習各種跟時間有關的 operators
bufferTime
throttleTime
windowTime
sampleTime
auditTime
==========================================================*/
// fromEvent(document, 'click').pipe(
// tap(event => console.log('tap:', event)),
// windowTime(2000),
// mergeMap(event => {
// console.log('-----');
// console.log(event);
// console.log('=====');
// return event.pipe(toArray(), tap(v => console.log('o:', v)));
// }),
// take(5),
// ).subscribe();
/* ========================================================
平均陣列的值,建立 avg operators
===========================================================*/
// const avg = map(data => {
// if (!Array.isArray(data)) return data;
// if (data.length === 0) return '0';
// return (data.reduce((acc, curr) => acc + curr, 0) / data.length);
// });
// interval(1000).pipe(
// mergeMap(v => from(createRamdonSizeArray())),
// bufferTime(800),
// avg,
// take(10),
// ).subscribe(console.log);
// function createRamdonSizeArray() {
// return Array.from({ length: Math.floor(Math.random() * 10) + 3 }).map(v => Math.floor(Math.random() * 10) + 1);
// }
/* ========================================================
建立自訂 operators 的方法
======================================================== */
/* 方法 1 */ 如果要由外面傳資料進去,可以用function的方式。很像map()
VVVVV
const Multiply1 = (times) => (source: Observable<any>) =>
^^^^^^^^^^^^^^^^^^^^^^^接受一個 Observable
$$$$$$$$
new Observable(observer => {
^^^^^^^^^^^^^^回傳一個新的 Observable
return source.subscribe({
^^^^^^^^^在 subscribe 處理,next、error 和 complete
next(value) {
$$$$$$$$ 將相關的動作對應到上層 (new Observable()) 的 observer
observer.next(value * times);
},
error(err) {
$$$$$$$$
observer.error(err);
},
complete() {
$$$$$$$$
observer.complete();
}
})
});
// 試用
of(1,2,3).pipe(Multiply1(10)).subscribe(console.log);
^^^^^^^^^^^^^ 很像map()
/* 方法 2 */
const Multiply = (times) => map(value => value * times);
^^^ 用map()來實作 function
寫一行的方式,沒法串太多operator
適合只用1個operator,很精簡,例如:filter()也適合寫成1行
或
const Multiply2 = (times) => obs => obs.pipe(
map((value: number) => value * times)
);
/* 方法 2-1 不同的寫法 */
function Multiply2_1(times) {
return function (obs) {
return obs.pipe(
map((value: number) => value * times)
)
}
}
/* 方法 3 */
const Multiply3 = (times) => map((value: number) => value * times);
const source$ = of(1, 2, 3);
console.log('---------方法1---------');
source$.pipe(Multiply1(2)).subscribe(console.log);
console.log('---------方法2---------');
source$.pipe(Multiply2(2)).subscribe(console.log);
console.log('---------方法2_1---------');
source$.pipe(Multiply2_1(2)).subscribe(console.log);
console.log('---------方法4---------');
source$.pipe(Multiply3(2)).subscribe(console.log);
import { Observable, of, combineLatest } from 'rxjs';
import { map } from 'rxjs/operators';
const Multiply = (source: Observable<any>)=>
new Observable(observer => {
return source.subscribe({
next(value){
observer.next(value*2);
},
error(err){
observer.error(err);
},
complete(){
observer.complete();
}
})
});
const test = (source: Observable) => combineLatest(source, of(1)).pipe(
原則: 接受Observable 回傳Observable
map((v1,v2)=>({v1,v2}))
)
of(1,2,3).pipe(test).subscribe(console.log);
// {v1: Array[2], v2: 0}
// v1: Array[2]
// 0:3
// 1:1
// v2: 0
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';
import _isNumber from 'lodash/isNumber';
VVVVVVVV ^^^^^^ 用lodash載入isNulber function
const isNumber = () => filter(_isNumber);
^^^^^^^^^ 判斷是不是Number
const source$ = from([1, 2, '3']);
source$.pipe(
isNumber() // 使用
).subscribe(console.log);
Kevin大大用一點時間講他的另一篇文章
https://blog.kevinyang.net/2018/08/31/rxjs-scheduler/
自訂Observable必需要學會scheduler
因為Observalbe的底層是用scheduler來完成的
https://blog.kevinyang.net/2018/08/31/rxjs-scheduler/#dive-in
scheduler底層會把所有的動作變成work
work 可以被執行的單位
action 去執行work
subscription
const sub = new Subscription();
const sub1 = queueScheduler.schedule((state)=> console.log(state), 0, 42);
const sub2 = queueScheduler.schedule((state)=> console.log(state), 0, '123');
^^^^^^^^ 用schedule會回傳subscription
sub.add(sub1); // 用subscription.add(),去加入其他subscription
sub.add(sub2);
sub.unsubscribe();
^^^^^^^^^^^ 被加進來的sub1、sub2同時也會被unsubscribe()
類似之前介紹的:
在Observable裡的,operator裡面加takeUntil()
再用subject去觸發什麼時候中斷
原始碼較新,較影片裡的from.ts的程式更精簡
https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/from.ts
import { Observable } from '../Observable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { scheduled } from '../scheduled/scheduled';
...
* @see {@link fromEvent}
* @see {@link fromEventPattern}
*
* @param {ObservableInput<T>} A subscription object, a Promise, an Observable-like,
* ^^^^^^^^^^^^^^^^^^ 1111111111111111111 2222222 333333333333333
* an Array, an iterable, or an array-like object to be converted.
* 44444 55555555 66666666666666666
* @param {SchedulerLike} An optional {@link SchedulerLike} on which to schedule the emission of values.
* @return {Observable<T>}
* @name from
* @owner Observable
*/
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> { ^^^^^^^^^^^^^^^^^^ 上述1~6種case
if (!scheduler) {
if (input instanceof Observable) {
return input;
}
return new Observable<T>(subscribeTo(input));
} else {
return scheduled(input, scheduler);
}
}
要切換TAB到6.0.0左右的版本,才能看到跟Kevin大大一樣的程式碼
import { Observable } from '../Observable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isObservable } from '../util/isObservable';
import { isIterable } from '../util/isIterable';
import { fromArray } from './fromArray';
import { fromPromise } from './fromPromise';
import { fromIterable } from './fromIterable';
import { fromObservable } from './fromObservable';
import { subscribeTo } from '../util/subscribeTo';
import { ObservableInput, SchedulerLike } from '../types';
// 多型
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T>; ^^^^^^^^^^^^^^^^^^^^^^^^^
export function from<T>(input: ObservableInput<ObservableInput<T>>, scheduler?: SchedulerLike): Observable<Observable<T>>;
export function from<T>(input: ObservableInput<T>, scheduler?: SchedulerLike): Observable<T> {
// 如果有scheduler
if (!scheduler) {
if (input instanceof Observable) {
return input;
}
return new Observable(subscribeTo(input));
}
// 進入點
if (input != null) {
// 分case,如果是Observable,就做xxx
if (isObservable(input)) {
return fromObservable(input, scheduler);
// 如果是Promise
} else if (isPromise(input)) {
return fromPromise(input, scheduler);
} else if (isArrayLike(input)) {
return fromArray(input, scheduler);
} else if (isIterable(input) || typeof input === 'string') {
return fromIterable(input, scheduler);
}
}
throw new TypeError((input !== null && typeof input || input) + ' is not observable');
}
https://github.com/ReactiveX/rxjs/blob/master/src/internal/observable/fromArray.ts
新版6.5.3已經大幅改寫
import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { subscribeToArray } from '../util/subscribeToArray';
import { scheduleArray } from '../scheduled/scheduleArray';
export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
if (!scheduler) {
return new Observable<T>(subscribeToArray(input));
^^^^^^^^^^^^^^^^^
} else {
return scheduleArray(input, scheduler);
}
}
要切換TAG到6.0.0版左右
import { Observable } from '../Observable';
import { SchedulerLike } from '../types';
import { Subscription } from '../Subscription';
import { subscribeToArray } from '../util/subscribeToArray';
export function fromArray<T>(input: ArrayLike<T>, scheduler?: SchedulerLike) {
if (!scheduler) {
return new Observable<T>(subscribeToArray(input));
} else {
return new Observable<T>(subscriber => {
^^^^^^^^^^^^^^^^^ 也是new Observable
const sub = new Subscription(); // 跟Kevin大大的scheduler文章寫法很像
// 證明Observalbe的底層是用scheduler來完成的
let i = 0;
vvvvvvvvv 外面傳進來的scheduler
sub.add(scheduler.schedule(function () {
^^^ ^^^^^^^^去執行工作
把scheduler add到上層sub,到時unsubscribe會全部一起unsubscribe
if (i === input.length) {
subscriber.complete(); // subscirbe的complete()
return;
}
subscriber.next(input[i++]); // subscribe的next()
if (!subscriber.closed) {
sub.add(this.schedule());
}
}));
return sub; // 回傳subscription
});
}
}
參考 fromArray、from的寫法,就能自訂義Observable的實作
自訂Observable的實作機會較少(Kevin大大只是附代一提)
自訂operator的實作機會較大