iT邦幫忙

第 11 屆 iThome 鐵人賽

DAY 27
0
Modern Web

Angular新手村學習筆記(2019)系列 第 27

Day27_RxJS Custom Operators(2/3)(ng conf 2018)

上一集看完Kiven大大的[S05E09] RxJS Custom Operators後
Youtube很厲害,都會出現相關影片

就是

ng conf 2018
Use the Custom Operator Force; Become an RxJS Jedi - Ryan Chenkie
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s
https://github.com/chenkie?tab=repositories

我就興起,看ng conf 2019,結果發現好幾集跟RxJS有關的,所以就試著看看
(開英文字幕一句一句猜@@")

明天會寫 ng conf 2019 另一篇,感覺也是跟 custom operator 有關的

ng conf 2019
How To Build Your Own RxJS Operators | Ben Lesh & Tracy Lee
https://www.youtube.com/watch?v=E6R_1QB8q4o&list=PLOETEcp3DkCpimylVKTDe968yNmNIajlR&index=51

所以今年就不在Day 30的時候寫心得了,
我會試著把相關的ng conf 2019看完,並把適合的部分寫成筆記
由於英文很爛,所以我會打出投影片的原文,再加上個人爛爛的理解
如果你有興趣,可以直接看影片,真的很棒!!

有英文字幕,多少可以學一些東西,雖然可能會理解錯誤
甚至我在想說,apple產品發表會都能翻譯+討論了
每年ng conf是否也能等影片出來後,對有興趣的各別理解、討論,或找大神開講分享

延申閱讀NG CONF 2018

Use the Custom Operator Force.Become an RxJS Jedi - Ryan Chenkie

成為一位RxJS絕地武士!!
https://www.youtube.com/watch?v=UaTLlcS9klU&list=PLSAPn-OQeDSzQD_EEN3cRr6cf-ePw8YSX&index=3&t=3740s

2018 NG CONF

Youtube很厲害,竟然可以跑出這個影片,感覺跟本集相關,就試著做筆記

Ryan Chenkie的github,有很多好料的程式
https://github.com/chenkie?tab=repositories

本集的程式碼
https://github.com/chenkie/custom-operators-workshop

包含3支小程式,我們來看完成的程式就好了

  • Exercise 1 - Custom Filter Operator
    Start branch: git checkout ex-1-start
    Finish branch: git checkout ex-1-finish

  • Exercise 2 - Combining the Latest Values
    Start branch: git checkout ex-2-start
    Finish branch: git checkout ex-2-finish

  • Exercise 3 - Custom Route Change Reporter
    Start branch: git checkout ex-3-start
    Finish branch: git checkout ex-3-finish

打一下投影片內容

  • 什麼是operators?
    operators are the things you put in the way of your stream before getting values out of it

  • operators vs javascript methods

// javascript
const values = ['foo','bar','baz'];
const filteredValues = values.filter(v => v !== 'foo');

// RxJS的operators
const valueStream = Rx.Observable.of('foo','bar','baz');
const fileteredValueStream=valueStream.filter(v => v !== 'foo');
  • if we have map() and filter() in javascript, why do we need them in RxJS?
    on the inside, operators work on Observable
    operators subscribe to a source$ Observable and return a result$ Observable

實作一個addHello operator,接受一個source$ Observable,
new一個新的Observable,實作next(),err(),complete(),並回傳新的Observalbe
^^^^^^^^^^^^^^^^^^^ new Observable<T>

function addHello(source$){
    // 已經沒有create operator了,改用return new Observable<T>
    return Rx.Observable.create(function(observer){
        // 這裡是inside
        // 1.subscribe
        source.subscribe(
            // 2.實作3個callback function
            function(value){
                observer.next(`Hello ${value}`);
            },                ^^^^^^^^^^^^^^^^
            function(err){
                observer.error(err);
            },
            function(){
                observer.complete();
            }
        )
    });
}

source$ Observable進來的資料流,
一個個跑filter的callback function,
如果回傳true,就送出值;
如果回傳false,就不送出值

...
class FilterOperator<T> implements Operator<T, T> {
                      VVVVVVVVV 應該就是判斷的function
  constructor(private predicate: (value: T, index: number) => boolean,
              private thisArg?: any) {
  }
                                  VVVVVV 傳入一個source$ Observable
  call(subscriber: Subscriber<T>, source: any): TeardownLogic {
       ^^^^^^^^^^ delegate給subscriber,裡面有next,error,complete
                                VVVVVVVVVVVVVVVV new一個新的
    return source.subscribe(new FilterSubscriber(subscriber, this.predicate, this.thisArg));
  }
}
...
class FilterSubscriber<T> extends Subscriber<T> {
  count: number = 0;
  constructor(destination: Subscriber<T>,
              private predicate: (value: T, index: number) => boolean,
              private thisArg: any) {
    super(destination);
  }

  ...
}

我們再看一下subscriber
https://github.com/ReactiveX/rxjs/blob/master/src/internal/Subscriber.ts

...
export class Subscriber<T> extends Subscription implements Observer<T> {
  ...
  /**
   * The {@link Observer} callback to receive notifications of type `next` from
   * the Observable, with a value. The Observable may call this method 0 or more
   * times.
   * @param {T} [value] The `next` value.
   * @return {void}
   */
  next(value?: T): void {
    if (!this.isStopped) {
      this._next(value);
    }
  }

  /**
   * The {@link Observer} callback to receive notifications of type `error` from
   * the Observable, with an attached `Error`. Notifies the Observer that
   * the Observable has experienced an error condition.
   * @param {any} [err] The `error` exception.
   * @return {void}
   */
  error(err?: any): void {
    if (!this.isStopped) {
      this.isStopped = true;
      this._error(err);
    }
  }

  /**
   * The {@link Observer} callback to receive a valueless notification of type
   * `complete` from the Observable. Notifies the Observer that the Observable
   * has finished sending push-based notifications.
   * @return {void}
   */
  complete(): void {
    if (!this.isStopped) {
      this.isStopped = true;
      this._complete();
    }
  }
}

再來看3種operators

  • individual value operators
    filter
  • combination operators
    combinelatest
  • accumulation operators

一個filter router events的範例

public getUrl(): void{
                VVVVVV angular router events$ 是 Observable
    this.router.events.pipe(
                        用來filter出Navigation Start events
        filter(event => event instanceof NavigationStart),
        filter((event: NavigationStart)=>(/foo/).test(event.url))
                                          ^^^^進一部filter路由
    )
    .subscribe(event=>{
        console.log(event.url); // 可以做logging之類的
    });
}

custom filter operator

filter-odd.js

const source = Rx.Observable.interval(500);

function filterOdd(source) {
  return Rx.Observable.create(function(observer) {
           對source$ observable裡面的subscription,去實作客製化的邏輯
           VVVVVVVVV
    source.subscribe(
      function(value) {
        if (value % 2 === 0) {
          observer.next(value);
        }
      },
      function(err) {
        observer.error(err);
      },
      function() {
        observer.complete();
      }
    );
  });
}

const odds = filterOdd(source);

odds.subscribe(function(x) {
  console.log(x);
});

the combinelatest operator

從多個observable streams,加工,回傳結果

  • calls subscribe to result on all the observables passed in
  • keeps track of the values emitted from the supplied observables and remembers the latest ones
  • a projection function can optionally be supplied

using combinelatest in a reactive form

有4個form control的reactive form

本範例展示,在reactive form,當Validator與多個controls有關時,怎麼用operators合併valuechange$ observable,傳給validator做驗證

而且使用async Validator,可以做即時驗證(前端key資料,立即連後端,判斷名字是否重複、email是否已註冊...等)

  1. flights.component.html
<div class="columns is-centered">
  <div class="column is-8">
    <div class="card">
      <div class="card-content">
        <div class="field">
          <h2 class="subtitle">Welcome to BadUXAir!</h2>
          <p>These are the airports we fly in and out of: SFO, LAX, SLC, SEA, DEN, LGA, ORD</p>
        </div>
        <form [formGroup]="flightsForm" (ngSubmit)="onSubmit()">
          <div class="columns">
            <div class="column is-3">
              <div class="field">
                <input type="text" class="input" placeholder="Departure Airport" formControlName="fromAirport">
              </div>
            </div>
            <div class="column is-3">
              <div class="field">
                <input type="text" class="input" placeholder="Arrival Airport" formControlName="toAirport">
              </div>
            </div>
            <div class="column is-3">
              <div class="field">
                <input type="date" class="input" placeholder="Arrival Airport" formControlName="departureDate">
              </div>
            </div>
            <div class="column is-3">
              <div class="field">
                <input type="date" class="input" placeholder="Arrival Airport" formControlName="returnDate">
              </div>
            </div>
          </div>
          <div class="columns">
            <div class="column is-full">
              <button type="submit" class="button is-primary" [disabled]="!flightsForm.valid">Book Flight!</button>
            </div>
          </div>

          <span *ngIf="isFetching">Validating flight...</span>

          <span class="has-text-danger" *ngIf="flightsForm.get('fromAirport').errors && flightsForm.get('fromAirport').errors.invalidFlightMessage">
            {{ flightsForm.get('fromAirport').errors.invalidFlightMessage }}
          </span>
        </form>
      </div>
    </div>
  </div>
</div>
  1. flights.component.ts
import { Component, OnInit } from '@angular/core';
import {
  FormGroup,
  FormBuilder,
  Validators,
  AbstractControl,
  AsyncValidatorFn
} from '@angular/forms';
import { combineLatest, debounceTime, filter, take } from 'rxjs/operators';

@Component({
  selector: 'app-flights',
  templateUrl: './flights.component.html',
  styleUrls: ['./flights.component.css']
})
export class FlightsComponent implements OnInit {
  public flightsForm: FormGroup;
  public isFetching = false;
  constructor(private fb: FormBuilder) {}

  ngOnInit() {
    this.createForm();

    // reactive form的control的valueChanges是observable         VVVVVVVVVVVV
    const fromAirport$ = this.flightsForm.controls.fromAirport.valueChanges;
    const toAirport$ = this.flightsForm.controls.toAirport.valueChanges;
    const departureDate$ = this.flightsForm.controls.departureDate.valueChanges;
    const returnDate$ = this.flightsForm.controls.returnDate.valueChanges;

    fromAirport$
      .pipe(
        // (在跑async validator之前,例用pipe+combinelatest做)
        // 把3個value changes併起來(都取streams裡最新的值)
        combineLatest(toAirport$, departureDate$, returnDate$),
        debounceTime(300)
      ) // 訂閱formAirport$時,把4個observable傳進去給validateAiport使用
      .subscribe(([fromAirport, toAirport, departureDate, returnDate]) => {
                               VVVVVVVVVVVVVV
        const validator = this.validateAiport({
          fromAirport,
          toAirport,
          departureDate,
          returnDate
        });
        this.flightsForm.controls.fromAirport.setAsyncValidators(validator);
        this.flightsForm.controls.fromAirport.updateValueAndValidity();
      });
  }
  // reative form
  private createForm(): void {
    this.flightsForm = this.fb.group({
      fromAirport: '',
      toAirport: '',
      departureDate: '',
      returnDate: ''
    });
  }
  
  // 實作async Validator,回傳promise
  // 使用情境:即時回傳,名字是否已有人使用。或email已註冊
  private validateAiport(data: any): AsyncValidatorFn {
    this.isFetching = true;
    return (control: AbstractControl) => {
      return new Promise((resolve, reject) => {
        setTimeout(() => {
          this.isFetching = false;
          resolve(this.fakeValidateFlight(data));
        }, 700);
      });
    };
  }

  private fakeValidateFlight(data: any): any {
    // 如果不在下面的路徑內,就報錯
    const flightRoutes = [
      { from: 'SFO', to: 'SLC' },
      { from: 'LGA', to: 'LAX' },
      { from: 'ORD', to: 'SEA' },
      { from: 'SEA', to: 'ORD' },
      { from: 'SLC', to: 'LGA' },
      { from: 'SFO', to: 'LGA' },
      { from: 'DEN', to: 'LGA' }
    ];
    return flightRoutes.find(
      f => f.from === data.fromAirport && f.to === data.toAirport
    )
      ? null
      : { invalidFlightMessage: 'Sorry, no flights between these airports!' };
  }

  public onSubmit(): void {
    console.log(this.flightsForm.value);
  }
}

accumulating streams

  • value can be combined between streams but also within the same stream
    value可以在不同observable組合,也可以在同一個observable組合(我的理解)
    例如: scan() operator
    scan( (x,y) => x+y)

  • accumulate values from a single observable stream
    kinda like array.reduce
    類似array的reduce

can be supplied a seed value
可以有初使值

will give back intermediate values that have been accumulated
會return 計算後的 中間值(intermediate accumulated value)

// array.reduce
const values=[1,2,3,4,5]
const accumulated=values.reduce( (acc,cur) => acc+cur );
console.log(accumulated); // 15  ^^^^^^^^^^^^^^^^^^^^

// RxJS的scan operator
// 1.先做一個簡單的streams
// const valuesStream=Rx.Observable.interval(500);
// valuesStream.subscribe(x=>console.log(x)); // 0 1 2 3...(每500ms)
// 2.加入scan
const valuesStream=Rx.Observable.interval(500).take(6);
valuesStream.scan((acc,cur)=>acc+cur).subscribe(x=>console.log(x));
                   ^^^^^^^^^^^^^^^^
// 0 1 3 6 10 15

主題再回到custom operator

  • we don't watn to repeat lone/complex operator chains
    使用情境:
    如果一個固定的功能常常用(要串一堆operators的)
    我們何不把他寫成一個operator呢?
    就能重複使用了,也有更高的可讀性

  • we can abstract to a custom operator instead

  • two common patterns for custom operators:

  1. put it on observable.prototype
  2. return a function and supply a source observable
  • you can (and should) use rx operators inside your custom operators
    (現有的operators不要再重寫)
                                VVVVVV supply a source observable
const multiply = multiplyFn => (source: Observable<any>) => {
    // 回傳新的Observable
    return new Observable(observer=>{
        return source.subscribe({
            next(x:any){
                observer.next(multiplyFn(x));
            },
            error(err:any){
                observer.error(err);
            },
            complete(){
                observer.complete();
            }
        });
    });
}

this.obs
    .pipe(multiply(x=>x*10))
                   ^^^^^^^^ multiplyFn (return a function)
    .subscribe(x=>console.log(x));

route change reporter

  1. app.component.ts
import { Observable } from 'rxjs/Observable';
import { Component, OnInit } from '@angular/core';
import { Router } from '@angular/router';
import { reportRouteChanges } from './custom-operators/report-route-changes';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  public routeReport: any;
  constructor(private router: Router) {}

  ngOnInit() {              VVVVVVVVVVVVVVVVVVVV 就是下面那個() arrow function
    this.router.events.pipe(reportRouteChanges()).subscribe(report => {
      this.routeReport = report;
      // {
      //   "previous":"/flights",
      //   "current":"route-one"
      // }
    });
  }
}
  1. report-route-changes.ts
import { NavigationEnd } from '@angular/router';
import { Observable } from 'rxjs/Observable';
import { filter, pairwise, map } from 'rxjs/operators';
                                  VV 不用傳參數的function
export const reportRouteChanges = () => (source: Observable<any>) => {
  return new Observable(observer => {
    return source
      // 開始組合,包含使用現有的operator,不用重新打造輪子
      .pipe(
        filter(event => event instanceof NavigationEnd),
        pairwise(),
        map((route: any) => {
          const [previous, current] = route;
          return { previous: previous.url, current: current.url };
        })
      )
      
      .subscribe({
        next(change: any) {
          observer.next(change);
        },
        error(err: any) {
          observer.error(err);
        },
        complete() {
          observer.complete();
        }
      });
  });
};

上一篇
Day26_RxJS Custom Operators(1/3)
下一篇
Day28_RxJS Custom Operators(3/3)(ng conf 2019)
系列文
Angular新手村學習筆記(2019)33
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言