上一篇的範例中有使用到SubmissionPublisher
,才更加地認識到其實Publisher需要做蠻多事情的,這邊先簡單介紹一下SubmissionPublisher
。
根據Java doc內容,SubmissionPublisher
使用在建構時傳入的Executor
來做到持續推送資料給Subscribers
,當Publisher
推送與Subscriber
消費的速度不同時能提供緩衝區(Buffer),若緩衝區到達上限可以有不同的方法來處理,最簡單就是卡住(block)直到資源可用。
在範例中有用到兩個method,看一下原始碼來大概了解一下用途,將subscriber
包裝成subscription
,對應到之前說明subscription
是publisher
與subscriber
溝通的橋樑,透過BufferedSubscription
的next
屬性(field)來達到像是linked list的效果,可以串聯所有的subscriber
並檢核是否有重複。
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
synchronized (this) {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
if (b == null) {
Throwable ex;
subscription.onSubscribe();
if ((ex = closedException) != null)
subscription.onError(ex);
else if (closed)
subscription.onComplete();
else if (pred == null)
clients = subscription;
else
pred.next = subscription;
break;
}
BufferedSubscription<T> next = b.next;
if (b.isClosed()) { // remove
b.next = null; // detach
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
}
else
pred = b;
b = next;
}
}
}
而我們將資料放到publisher的submit實際上就是去呼叫doOffer()
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
doOffer
會根據目前的大小有重試(retry)的機制,主要推送的邏輯是在(BufferedSubscription
) b.offer(item, unowned);
,會透過publisher的Executor
執行ConsumerTask
,在Task中run (BufferedSubscription)consumer.consume()
,在裡面就會根據條件去呼叫我們熟悉的subscriber的onSubscribe
、onNext
,這邊就先大致了解一下,先不深入探討。
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
synchronized (this) {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
complete = closed;
else {
complete = false;
boolean cleanMe = false;
BufferedSubscription<T> retries = null, rtail = null, next;
do {
next = b.next;
int stat = b.offer(item, unowned);
if (stat == 0) { // saturated; add to retry list
b.nextRetry = null; // avoid garbage on exceptions
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
}
else if (stat < 0) // closed
cleanMe = true; // remove later
else if (stat > lag)
lag = stat;
} while ((b = next) != null);
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
}
而上一篇還沒介紹到的Processor,doc的說明表示Processor同時是Subscriber 也是Publisher,其實蠻好理解的,因為介於中間,相對於Publisher是Subscriber,相對於Subscriber則是Publisher,主要是用來做資料的轉換。
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
銜接上一篇的實作,我們在中間加上ApplePodcastProcessor
,讓每一集輸出的時候都會加上apple的字樣,在建構時傳入轉換的function,onNext
的時候做轉換。
@EqualsAndHashCode(callSuper = true)
@Data
public class ApplePodcastProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
private Subscription subscription;
private Function<T, R> function;
public ApplePodcastProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
最終測試成功讓每一集都有加上apple字樣。
@Test
void testJava9Reactive() throws InterruptedException {
Member<String> member = new Member<>();
SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();
ApplePodcastProcessor<String, String> processor =
new ApplePodcastProcessor<>(item -> "apple : " + item);
podcastChannel.subscribe(processor);
processor.subscribe(member);
Assertions.assertEquals(1, podcastChannel.getSubscribers().size());
List<String> episodes = List.of("1", "2", "3", "4");
episodes.forEach(podcastChannel::submit);
System.out.println(member.getEpisodes());
Thread.sleep(1000);
podcastChannel.close();
System.out.println(member.getEpisodes());
/* output:
[]
onNext:apple : 1
onNext:apple : 2
onNext:apple : 3
onNext:apple : 4
[apple : 1, apple : 2, apple : 3, apple : 4]
onComplete
*/
}
雖然java9推出了符合Reactive Steam的Flow Api,但可以看出如果要在公司的專案中使用還是要自己打造不少輪子,所以接下來就來介紹其他可以幫助我們Reactive Programming in Java的好工具。
Subscription 和 processor 會有關連嗎
我認為是有的,因為subscription
是publisher
與subscriber
溝通的橋樑,processor
介於publisher
與subscriber
之間,當processor
面向publisher
的時候就相對是subscriber
,所以一樣是透過subscription
來溝通的,上面applePodcastProcessor
裡面實作也是有用到subscription
。
希望有回答到你的問題。