上一集看完Kiven大大的[S05E09] RxJS Custom Operators後
Youtube很厲害,都會出現相關影片
就是
ng conf 2018
Use the Custom Operator Force; Become an RxJS Jedi - Ryan Chenkie
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s
https://github.com/chenkie?tab=repositories
我就興起,看ng conf 2019,結果發現好幾集跟RxJS有關的,所以就試著看看
(開英文字幕一句一句猜@@")
明天會寫 ng conf 2019 另一篇,感覺也是跟 custom operator 有關的
ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51
所以今年就不在Day 30的時候寫心得了,
我會試著把相關的ng conf 2019看完,並把適合的部分寫成筆記
由於英文很爛,所以我會打出投影片的原文,再加上個人爛爛的理解
如果你有興趣,可以直接看影片,真的很棒!!
有英文字幕,多少可以學一些東西,雖然可能會理解錯誤
甚至我在想說,apple產品發表會都能翻譯+討論了
每年ng conf是否也能等影片出來後,對有興趣的各別理解、討論,或找大神開講分享
成為一位RxJS絕地武士!!
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s
2018 NG CONF
Youtube很厲害,竟然可以跑出這個影片,感覺跟本集相關,就試著做筆記
Ryan Chenkie的github,有很多好料的程式
https://github.com/chenkie?tab=repositories
本集的程式碼
https://github.com/chenkie/custom-operators-workshop
包含3支小程式,我們來看完成的程式就好了
Exercise 1 - Custom Filter Operator
Start branch: git checkout ex-1-start
Finish branch: git checkout ex-1-finish
Exercise 2 - Combining the Latest Values
Start branch: git checkout ex-2-start
Finish branch: git checkout ex-2-finish
Exercise 3 - Custom Route Change Reporter
Start branch: git checkout ex-3-start
Finish branch: git checkout ex-3-finish
什麼是operators?
operators are the things you put in the way of your stream before getting values out of it
operators vs javascript methods
// javascript
const values = ['foo','bar','baz'];
const filteredValues = values.filter(v => v !== 'foo');
// RxJS的operators
const valueStream = Rx.Observable.of('foo','bar','baz');
const fileteredValueStream=valueStream.filter(v => v !== 'foo');
Observable
subscribe
to a source$ Observable
and return a result$ Observable
實作一個addHello operator,接受一個source$ Observable,
new一個新的Observable,實作next(),err(),complete(),並回傳新的Observalbe
^^^^^^^^^^^^^^^^^^^ new Observable<T>
function addHello(source$){
// 已經沒有create operator了,改用return new Observable<T>
return Rx.Observable.create(function(observer){
// 這裡是inside
// 1.subscribe
source.subscribe(
// 2.實作3個callback function
function(value){
observer.next(`Hello ${value}`);
}, ^^^^^^^^^^^^^^^^
function(err){
observer.error(err);
},
function(){
observer.complete();
}
)
});
}
source$ Observable進來的資料流,
一個個跑filter的callback function,
如果回傳true,就送出值;
如果回傳false,就不送出值
...
class FilterOperator<T> implements Operator<T, T> {
VVVVVVVVV 應該就是判斷的function
constructor(private predicate: (value: T, index: number) => boolean,
private thisArg?: any) {
}
VVVVVV 傳入一個source$ Observable
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
^^^^^^^^^^ delegate給subscriber,裡面有next,error,complete
VVVVVVVVVVVVVVVV new一個新的
return source.subscribe(new FilterSubscriber(subscriber, this.predicate, this.thisArg));
}
}
...
class FilterSubscriber<T> extends Subscriber<T> {
count: number = 0;
constructor(destination: Subscriber<T>,
private predicate: (value: T, index: number) => boolean,
private thisArg: any) {
super(destination);
}
...
}
我們再看一下subscriber
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts
...
export class Subscriber<T> extends Subscription implements Observer<T> {
...
/**
* The {@link Observer} callback to receive notifications of type `next` from
* the Observable, with a value. The Observable may call this method 0 or more
* times.
* @param {T} [value] The `next` value.
* @return {void}
*/
next(value?: T): void {
if (!this.isStopped) {
this._next(value);
}
}
/**
* The {@link Observer} callback to receive notifications of type `error` from
* the Observable, with an attached `Error`. Notifies the Observer that
* the Observable has experienced an error condition.
* @param {any} [err] The `error` exception.
* @return {void}
*/
error(err?: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
}
}
/**
* The {@link Observer} callback to receive a valueless notification of type
* `complete` from the Observable. Notifies the Observer that the Observable
* has finished sending push-based notifications.
* @return {void}
*/
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
}
}
}
public getUrl(): void{
VVVVVV angular router events$ 是 Observable
this.router.events.pipe(
用來filter出Navigation Start events
filter(event => event instanceof NavigationStart),
filter((event: NavigationStart)=>(/foo/).test(event.url))
^^^^進一部filter路由
)
.subscribe(event=>{
console.log(event.url); // 可以做logging之類的
});
}
filter-odd.js
const source = Rx.Observable.interval(500);
function filterOdd(source) {
return Rx.Observable.create(function(observer) {
對source$ observable裡面的subscription,去實作客製化的邏輯
VVVVVVVVV
source.subscribe(
function(value) {
if (value % 2 === 0) {
observer.next(value);
}
},
function(err) {
observer.error(err);
},
function() {
observer.complete();
}
);
});
}
const odds = filterOdd(source);
odds.subscribe(function(x) {
console.log(x);
});
從多個observable streams,加工,回傳結果
有4個form control的reactive form
本範例展示,在reactive form,當Validator與多個controls有關時,怎麼用operators合併valuechange$ observable,傳給validator做驗證
而且使用async Validator,可以做即時驗證(前端key資料,立即連後端,判斷名字是否重複、email是否已註冊...等)
<div class="columns is-centered">
<div class="column is-8">
<div class="card">
<div class="card-content">
<div class="field">
<h2 class="subtitle">Welcome to BadUXAir!</h2>
<p>These are the airports we fly in and out of: SFO, LAX, SLC, SEA, DEN, LGA, ORD</p>
</div>
<form [formGroup]="flightsForm" (ngSubmit)="onSubmit()">
<div class="columns">
<div class="column is-3">
<div class="field">
<input type="text" class="input" placeholder="Departure Airport" formControlName="fromAirport">
</div>
</div>
<div class="column is-3">
<div class="field">
<input type="text" class="input" placeholder="Arrival Airport" formControlName="toAirport">
</div>
</div>
<div class="column is-3">
<div class="field">
<input type="date" class="input" placeholder="Arrival Airport" formControlName="departureDate">
</div>
</div>
<div class="column is-3">
<div class="field">
<input type="date" class="input" placeholder="Arrival Airport" formControlName="returnDate">
</div>
</div>
</div>
<div class="columns">
<div class="column is-full">
<button type="submit" class="button is-primary" [disabled]="!flightsForm.valid">Book Flight!</button>
</div>
</div>
<span *ngIf="isFetching">Validating flight...</span>
<span class="has-text-danger" *ngIf="flightsForm.get('fromAirport').errors && flightsForm.get('fromAirport').errors.invalidFlightMessage">
{{ flightsForm.get('fromAirport').errors.invalidFlightMessage }}
</span>
</form>
</div>
</div>
</div>
</div>
import { Component, OnInit } from '@angular/core';
import {
FormGroup,
FormBuilder,
Validators,
AbstractControl,
AsyncValidatorFn
} from '@angular/forms';
import { combineLatest, debounceTime, filter, take } from 'rxjs/operators';
@Component({
selector: 'app-flights',
templateUrl: './flights.component.html',
styleUrls: ['./flights.component.css']
})
export class FlightsComponent implements OnInit {
public flightsForm: FormGroup;
public isFetching = false;
constructor(private fb: FormBuilder) {}
ngOnInit() {
this.createForm();
// reactive form的control的valueChanges是observable VVVVVVVVVVVV
const fromAirport$ = this.flightsForm.controls.fromAirport.valueChanges;
const toAirport$ = this.flightsForm.controls.toAirport.valueChanges;
const departureDate$ = this.flightsForm.controls.departureDate.valueChanges;
const returnDate$ = this.flightsForm.controls.returnDate.valueChanges;
fromAirport$
.pipe(
// (在跑async validator之前,例用pipe+combinelatest做)
// 把3個value changes併起來(都取streams裡最新的值)
combineLatest(toAirport$, departureDate$, returnDate$),
debounceTime(300)
) // 訂閱formAirport$時,把4個observable傳進去給validateAiport使用
.subscribe(([fromAirport, toAirport, departureDate, returnDate]) => {
VVVVVVVVVVVVVV
const validator = this.validateAiport({
fromAirport,
toAirport,
departureDate,
returnDate
});
this.flightsForm.controls.fromAirport.setAsyncValidators(validator);
this.flightsForm.controls.fromAirport.updateValueAndValidity();
});
}
// reative form
private createForm(): void {
this.flightsForm = this.fb.group({
fromAirport: '',
toAirport: '',
departureDate: '',
returnDate: ''
});
}
// 實作async Validator,回傳promise
// 使用情境:即時回傳,名字是否已有人使用。或email已註冊
private validateAiport(data: any): AsyncValidatorFn {
this.isFetching = true;
return (control: AbstractControl) => {
return new Promise((resolve, reject) => {
setTimeout(() => {
this.isFetching = false;
resolve(this.fakeValidateFlight(data));
}, 700);
});
};
}
private fakeValidateFlight(data: any): any {
// 如果不在下面的路徑內,就報錯
const flightRoutes = [
{ from: 'SFO', to: 'SLC' },
{ from: 'LGA', to: 'LAX' },
{ from: 'ORD', to: 'SEA' },
{ from: 'SEA', to: 'ORD' },
{ from: 'SLC', to: 'LGA' },
{ from: 'SFO', to: 'LGA' },
{ from: 'DEN', to: 'LGA' }
];
return flightRoutes.find(
f => f.from === data.fromAirport && f.to === data.toAirport
)
? null
: { invalidFlightMessage: 'Sorry, no flights between these airports!' };
}
public onSubmit(): void {
console.log(this.flightsForm.value);
}
}
streams
value can be combined between streams but also within the same stream
value可以在不同observable組合,也可以在同一個observable組合(我的理解)
例如: scan() operator
scan( (x,y) => x+y)
accumulate values from a single observable stream
kinda like array.reduce
類似array的reduce
can be supplied a seed value
可以有初使值
will give back intermediate values that have been accumulated
會return 計算後的 中間值(intermediate accumulated value)
// array.reduce
const values=[1,2,3,4,5]
const accumulated=values.reduce( (acc,cur) => acc+cur );
console.log(accumulated); // 15 ^^^^^^^^^^^^^^^^^^^^
// RxJS的scan operator
// 1.先做一個簡單的streams
// const valuesStream=Rx.Observable.interval(500);
// valuesStream.subscribe(x=>console.log(x)); // 0 1 2 3...(每500ms)
// 2.加入scan
const valuesStream=Rx.Observable.interval(500).take(6);
valuesStream.scan((acc,cur)=>acc+cur).subscribe(x=>console.log(x));
^^^^^^^^^^^^^^^^
// 0 1 3 6 10 15
we don't watn to repeat lone/complex operator chains
使用情境:
如果一個固定的功能常常用(要串一堆operators的)
我們何不把他寫成一個operator呢?
就能重複使用了,也有更高的可讀性
we can abstract to a custom operator instead
two common patterns for custom operators:
function
and supply a source observable
VVVVVV supply a source observable
const multiply = multiplyFn => (source: Observable<any>) => {
// 回傳新的Observable
return new Observable(observer=>{
return source.subscribe({
next(x:any){
observer.next(multiplyFn(x));
},
error(err:any){
observer.error(err);
},
complete(){
observer.complete();
}
});
});
}
this.obs
.pipe(multiply(x=>x*10))
^^^^^^^^ multiplyFn (return a function)
.subscribe(x=>console.log(x));
import { Observable } from 'rxjs/Observable';
import { Component, OnInit } from '@angular/core';
import { Router } from '@angular/router';
import { reportRouteChanges } from './custom-operators/report-route-changes';
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
public routeReport: any;
constructor(private router: Router) {}
ngOnInit() { VVVVVVVVVVVVVVVVVVVV 就是下面那個() arrow function
this.router.events.pipe(reportRouteChanges()).subscribe(report => {
this.routeReport = report;
// {
// "previous":"/flights",
// "current":"route-one"
// }
});
}
}
import { NavigationEnd } from '@angular/router';
import { Observable } from 'rxjs/Observable';
import { filter, pairwise, map } from 'rxjs/operators';
VV 不用傳參數的function
export const reportRouteChanges = () => (source: Observable<any>) => {
return new Observable(observer => {
return source
// 開始組合,包含使用現有的operator,不用重新打造輪子
.pipe(
filter(event => event instanceof NavigationEnd),
pairwise(),
map((route: any) => {
const [previous, current] = route;
return { previous: previous.url, current: current.url };
})
)
.subscribe({
next(change: any) {
observer.next(change);
},
error(err: any) {
observer.error(err);
},
complete() {
observer.complete();
}
});
});
};