iT邦幫忙

2021 iThome 鐵人賽

DAY 5
0
Software Development

從零開始Reactive Programming- Spring系列 第 6

[Day 5] Reactive Programming - Java 9(Publisher、Subscribers)

  • 分享至 

  • xImage
  •  

前言

java 9的時候新增支援Reactive Stream,所以在介紹Spring ReactorWebFlux之前先來認識一下Java 原生的Flow Api。

java 9

java 9更新推出對應Reactive Streams 規格的分別有三個interface,Publisher負責產生item來推送給一個或多個Subscribers去使用(consumed),每一個item都被Subscription所管理,中間有一或多個Processor來處理轉換或是特別的邏輯。

https://ithelp.ithome.com.tw/upload/images/20210919/20141418I9FzwzGbi9.png

Subscriber訂閱PublisherPublisher負責產生推送資料,透過Subscription 居中管理, Subscriber會提出需要的資料量,這樣就有背壓(backpressure)機制(之後介紹),來避免Subscriber來不及消化大量資料導致系統異常。下方是java.util.concurrent.Flow 移除掉文件說明的部分,從程式碼可以看出

  1. Publisher提供訂閱
  2. Subscriber 有四個方法 分別為
    1. onSubscribe(Subscription subscription):  訂閱時同步Subscription
    2. onNext(T item): 處理item
    3. onError(Throwable throwable): 錯誤處理,這部分就是單純Java 8 Stream所沒有的
    4. onComplete(): Publisher結束時
/* 
public final class Flow { 
    private Flow() {} // uninstantiable 
    
    @FunctionalInterface 
    public static interface Publisher<T> { 
       
        public void subscribe(Subscriber<? super T> subscriber); 
    } 
    public static interface Subscriber<T> { 
      
        public void onSubscribe(Subscription subscription); 
       
        public void onNext(T item); 
      
        public void onError(Throwable throwable); 
       
        public void onComplete(); 
    } 
     
    public static interface Subscription { 
    
        public void request(long n); 
   
        public void cancel(); 
    } 
       
}

DEMO

實際動手做一個簡單聽Podcast要取得節目內容的DEMO
建立Member會員實作Subscriber,假設會員要訂閱Podcast頻道,訂閱時同步subscription並要求一個item,每接受到一個item就存入episodes 並要求下一個,最後當Publisher結束時則會印出onComplete。

@Data 
public class Member <T>  implements Subscriber<T> { 
  private Subscription subscription; 
  private List<T> episodes = new LinkedList<>();
  @Override 
  public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); 
  } 
  @Override 
  public void onNext(T item) { 
    System.out.println("onNext:" + item); 
    episodes.add(item);
    subscription.request(1); 
  } 
  @Override 
  public void onError(Throwable throwable) { 
    System.out.println("onError:" + throwable); 
  } 
  @Override 
  public void onComplete() { 
    System.out.println("onComplete"); 
  } 
}

因為Publisher比較複雜我們直接拿原生的SubmissionPublisher來使用,實際DEMO情況如下
新增一個會員跟一個Podcast頻道並訂閱,將內容推送到頻道當中,在第一時間印出是可以發現尚未收到任何集數,因為整個流程是非同步的在背後進行,所以當我們暫停一秒鐘後就可以正常到看到結果。

@Test
void testJava9Reactive() throws InterruptedException {
  Member<String> member = new Member<>();
  SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();

  podcastChannel.subscribe(member);
  Assertions.assertEquals(1, podcastChannel.getSubscribers().size());

  List<String> episodes = List.of("1", "2", "3", "4");

  episodes.forEach(podcastChannel::submit);
  System.out.println(member.getEpisodes());
  Thread.sleep(1000);
  podcastChannel.close();
  System.out.println(member.getEpisodes());

  /* output:
      []
  onNext:1
  onNext:2
  onNext:3
  onNext:4
  [1, 2, 3, 4]
  onComplete
       */
}

結語

今天終於開始進入到Reactive Programming並帶有一點實作,下一篇會繼續介紹今天用到的SubmissionPublisher與一開始提到的Processor。

資料來源


上一篇
[Day 4] Reactive Programming - 觀察者模式Observer Pattern
下一篇
[Day 6] Reactive Programming - Java 9(SubmissionPublisher、Processor)
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言