簡單來說就是一群iterable的非同步事件。
像是每秒輸出一個數字,但是你可能會想說就算我不用 Stream
我也有辦法辦到:
Timer.periodic(Duration(seconds: 1), (timer) {
print(timer.tick);
});
沒錯其實 Timer
就有辦法達成這個需求,但如果今天突然想要數到10就好,或者某幾個數要跳過,甚至想要旁邊按個按鈕可以暫停想要繼續數時再繼續數,那我們使用Stream
就會比較簡單且優雅的達成這些需求。
final myStream = NumberCreator().stream; //這是一個會每秒輸出遞增數字的Stream
final subscription = myStream.listen((event) {
print('event $event');
}, onError: (err) {
print(err);
}, onDone: () {
print('subscription done!!');
},cancelOnError: false);
NumberCreator
是我自己建立的一個每一秒會輸出一個數字的Stream
類似於上面的 Timer.periodic
的效果,如果我們想要取得Stream
的值那我們必須要對這個「監聽」這個Stream
,在Dart中就是利用 Stream.listen
來達成。
而輸出會是:
event 1
event 2
ERROR!!!
//....
event 19
subscription done!!
Stream.listen
會回傳一個StreamSubscription
讓我們可以管理所訂閱的Stream
,Stream.listen
有提供三個可以管理Stream
狀態的參數分別是 onData
(positional)、 onError
、 onDone
(named)
其實從名字就可以看出功用,onData
就是當Stream
有了結果回傳後就執行的callback、onError
及onDone
就是當有error時及stream完成時。
而且還有提供一個參數cancelOnError
讓我們決定發生錯誤時要不要繼續監聽 。
那我們該如何建立自己的Stream
呢?
以Stream
來說有幾個比較重要的介面(或者該說是abstract class
)要了解:
StreamController
StreamSink
Stream
StreamSubscription
扣掉已經講過的Stream
及StreamSubscription
StreamController
通常是拿來控制及檢查這個Stream
的各種狀態,像是完成、錯誤、暫停、有無訂閱者等等。
StreamSink
就是Stream
來接收「事件」的地方,所謂事件就像前幾篇文章提到的,可能是同步的可能是非同步的,可能是隨著時間來的可能是使用者操作而產生的。
所以當我們想要自己建立一個 Stream
來進行操作的話:
class NumberCreator {
NumberCreator() {
Timer.periodic(Duration(seconds: 1), (timer) {
if (timer.tick == 3) {
_controller.addError('ERROR!!!');
} else if (timer.tick == 20) {
timer.cancel();
_controller.close();
} else {
_controller.sink.add(timer.tick);
}
});
}
final _controller = StreamController<int>();
Stream<int> get stream => _controller.stream;
}
我們先建立一個StreamController
,然後利用StreamSink
接收「事件」,這邊為了測試各種狀態所以自己實作了錯誤及完成狀態,最後宣告一個 get
讓外部獲取這個 StreamController
的 Stream
。
但如果只是單純要一個隨著時間輸出的 Stream
,也是有較為簡單的方式:
Stream<int> counterStream() {
return Stream<int>.periodic(const Duration(seconds: 1), (x) => x);
}
既然 Stream
是 iterable 的,那也就代表我們是可以對各別「事件」進行一些操作的:
final myStream2 =
counterStream().map((event) => event * 2).take(3).listen(print);
final myStream3 = counterStream()
.map((event) => event +3)
.takeWhile((element) => element <= 15)
.listen(print);
(是不是有點像RxDart/RxJS的感覺)
像是我們可以利用 map
讓Stream
中每個值都乘2,或者要拿幾個值就好之類的操作。
當然還有很多方法可以使用這裡就不一一說明了
前面說到StreamSubscription
可以拿來控制Stream
相關的狀態,當然也包含要不要繼續監聽或者暫停等等之類的操作:
void pauseStream5sAndResume(StreamSubscription subscription) {
Future.delayed(Duration(seconds: 5), () {
subscription.pause();
}).then((_) {
subscription.resume();
});
}
像是可以利用 pause
或者 resume
進行更多細節操作。
坦白說可能是因為做過專案規模的問題,我很少操作Stream
相關的語法,但大多數時候就是需要管理到複雜的非同步操作時,又或者是用於狀態管理像是BloC。
今天的程式碼:
https://github.com/zxc469469/dart-playground/tree/Day11/stream
講完Stream
就算把Dart中的非同步的大多數概念都提到了,雖然還有 generator
等語法但因為沒有實際使用過的經驗就不寫相關的內容。
至此Dart語法相關的內容也講得差不多了,剩下最後最後的null safety及我自己額外想提到的Dart中的常用的一些FP概念就要開始進入到Flutter章節。
參考資料