iT邦幫忙

2021 iThome 鐵人賽

DAY 6
0
Software Development

從零開始Reactive Programming- Spring系列 第 7

[Day 6] Reactive Programming - Java 9(SubmissionPublisher、Processor)

  • 分享至 

  • xImage
  •  

前言

上一篇的範例中有使用到SubmissionPublisher,才更加地認識到其實Publisher需要做蠻多事情的,這邊先簡單介紹一下SubmissionPublisher

SubmissionPublisher

根據Java doc內容,SubmissionPublisher使用在建構時傳入的Executor來做到持續推送資料給Subscribers,當Publisher推送與Subscriber消費的速度不同時能提供緩衝區(Buffer),若緩衝區到達上限可以有不同的方法來處理,最簡單就是卡住(block)直到資源可用。

在範例中有用到兩個method,看一下原始碼來大概了解一下用途,將subscriber包裝成subscription ,對應到之前說明subscriptionpublishersubscriber溝通的橋樑,透過BufferedSubscriptionnext屬性(field)來達到像是linked list的效果,可以串聯所有的subscriber並檢核是否有重複。

public void subscribe(Subscriber<? super T> subscriber) { 
        if (subscriber == null) throw new NullPointerException(); 
        int max = maxBufferCapacity; // allocate initial array 
        Object[] array = new Object[max < INITIAL_CAPACITY ? 
                                    max : INITIAL_CAPACITY]; 
        BufferedSubscription<T> subscription = 
            new BufferedSubscription<T>(subscriber, executor, onNextHandler, 
                                        array, max); 
        synchronized (this) { 
            if (!subscribed) { 
                subscribed = true; 
                owner = Thread.currentThread(); 
            } 
            for (BufferedSubscription<T> b = clients, pred = null;;) { 
                if (b == null) { 
                    Throwable ex; 
                    subscription.onSubscribe(); 
                    if ((ex = closedException) != null) 
                        subscription.onError(ex); 
                    else if (closed) 
                        subscription.onComplete(); 
                    else if (pred == null) 
                        clients = subscription; 
                    else 
                        pred.next = subscription; 
                    break; 
                } 
                BufferedSubscription<T> next = b.next; 
                if (b.isClosed()) {   // remove 
                    b.next = null;    // detach 
                    if (pred == null) 
                        clients = next; 
                    else 
                        pred.next = next; 
                } 
                else if (subscriber.equals(b.subscriber)) { 
                    b.onError(new IllegalStateException("Duplicate subscribe")); 
                    break; 
                } 
                else 
                    pred = b; 
                b = next; 
            } 
        } 
    }

而我們將資料放到publisher的submit實際上就是去呼叫doOffer()

  public int submit(T item) { 
        return doOffer(item, Long.MAX_VALUE, null); 
    }

doOffer會根據目前的大小有重試(retry)的機制,主要推送的邏輯是在(BufferedSubscription) b.offer(item, unowned); ,會透過publisher的Executor執行ConsumerTask,在Task中run  (BufferedSubscription)consumer.consume(),在裡面就會根據條件去呼叫我們熟悉的subscriber的onSubscribeonNext,這邊就先大致了解一下,先不深入探討。

 private int doOffer(T item, long nanos, 
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 
        if (item == null) throw new NullPointerException(); 
        int lag = 0; 
        boolean complete, unowned; 
        synchronized (this) { 
            Thread t = Thread.currentThread(), o; 
            BufferedSubscription<T> b = clients; 
            if ((unowned = ((o = owner) != t)) && o != null) 
                owner = null;                     // disable bias 
            if (b == null) 
                complete = closed; 
            else { 
                complete = false; 
                boolean cleanMe = false; 
                BufferedSubscription<T> retries = null, rtail = null, next; 
                do { 
                    next = b.next; 
                    int stat = b.offer(item, unowned); 
                    if (stat == 0) {              // saturated; add to retry list 
                        b.nextRetry = null;       // avoid garbage on exceptions 
                        if (rtail == null) 
                            retries = b; 
                        else 
                            rtail.nextRetry = b; 
                        rtail = b; 
                    } 
                    else if (stat < 0)            // closed 
                        cleanMe = true;           // remove later 
                    else if (stat > lag) 
                        lag = stat; 
                } while ((b = next) != null); 
                if (retries != null || cleanMe) 
                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); 
            } 
        } 
        if (complete) 
            throw new IllegalStateException("Closed"); 
        else 
            return lag; 
    }

Processor

而上一篇還沒介紹到的Processor,doc的說明表示Processor同時是Subscriber 也是Publisher,其實蠻好理解的,因為介於中間,相對於Publisher是Subscriber,相對於Subscriber則是Publisher,主要是用來做資料的轉換。

    /** 
     * A component that acts as both a Subscriber and Publisher. 
     * 
     * @param <T> the subscribed item type 
     * @param <R> the published item type 
     */ 
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { 
    }

https://ithelp.ithome.com.tw/upload/images/20210920/201414181ItXG03RVO.png

DEMO

銜接上一篇的實作,我們在中間加上ApplePodcastProcessor,讓每一集輸出的時候都會加上apple的字樣,在建構時傳入轉換的function,onNext的時候做轉換。

@EqualsAndHashCode(callSuper = true)
@Data
public class ApplePodcastProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
  private Subscription subscription;
  private Function<T, R> function;

  public ApplePodcastProcessor(Function<T, R> function) {
    super();
    this.function = function;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(T item) {
    submit(function.apply(item));
    subscription.request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("onError:" + throwable);
  }

  @Override
  public void onComplete() {
    System.out.println("onComplete");
  }
}

最終測試成功讓每一集都有加上apple字樣。

  @Test 
  void testJava9Reactive() throws InterruptedException { 
    Member<String> member = new Member<>(); 
    SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>(); 
    ApplePodcastProcessor<String, String> processor = 
        new ApplePodcastProcessor<>(item -> "apple : " + item); 
    podcastChannel.subscribe(processor); 
    processor.subscribe(member); 
    Assertions.assertEquals(1, podcastChannel.getSubscribers().size()); 
    List<String> episodes = List.of("1", "2", "3", "4"); 
    episodes.forEach(podcastChannel::submit); 
    System.out.println(member.getEpisodes()); 
    Thread.sleep(1000); 
    podcastChannel.close(); 
    System.out.println(member.getEpisodes()); 
    /* output: 
         [] 
    onNext:apple : 1 
    onNext:apple : 2 
    onNext:apple : 3 
    onNext:apple : 4 
    [apple : 1, apple : 2, apple : 3, apple : 4] 
    onComplete 
             */ 
  }

結語

雖然java9推出了符合Reactive Steam的Flow Api,但可以看出如果要在公司的專案中使用還是要自己打造不少輪子,所以接下來就來介紹其他可以幫助我們Reactive Programming in Java的好工具。

資料來源


上一篇
[Day 5] Reactive Programming - Java 9(Publisher、Subscribers)
下一篇
[Day 7] Reactive Programming - Reactor(FLUX & MONO) Part 1
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

1 則留言

0
chichi
iT邦新手 2 級 ‧ 2021-09-25 12:08:09

Subscription 和 processor 會有關連嗎

我認為是有的,因為subscriptionpublishersubscriber溝通的橋樑,processor介於publishersubscriber之間,當processor面向publisher的時候就相對是subscriber,所以一樣是透過subscription來溝通的,上面applePodcastProcessor裡面實作也是有用到subscription
希望有回答到你的問題。

我要留言

立即登入留言