Flutter異步編程-Stream
?Stream可以說(shuō)是構(gòu)成Dart響應(yīng)式流編程重要組成部分。還記得之前文章中說(shuō)過(guò)的Future嗎,我們知道每個(gè)Future代表單一的值,可以異步傳送數(shù)據(jù)或異常。而Stream的異步工作方式和Future類似,只是Stream代表的是一系列的事件,那么就可能傳遞任意數(shù)據(jù)值,可能是多個(gè)值也可以是異常。比如從磁盤中讀取一個(gè)文件,那么這里返回的就是一個(gè)Stream。此外Stream是基于事件流訂閱的機(jī)制來(lái)運(yùn)轉(zhuǎn)工作的。
?
1. 為什么需要Stream
首先,在Dart單線程模型中,要實(shí)現(xiàn)異步就需要借助類似Stream、Future之類的API實(shí)現(xiàn)。所以「Stream可以很好地實(shí)現(xiàn)Dart的異步編程」。此外,在Dart中一些異步場(chǎng)景中,比如磁盤文件、數(shù)據(jù)庫(kù)讀取等類似需要讀取一系列的數(shù)據(jù)時(shí),這種場(chǎng)景Future是不太合適的,所以在一些需要實(shí)現(xiàn)一系列異步事件時(shí)Stream就是不錯(cuò)的選擇,「Stream提供一系列異步的數(shù)據(jù)序列」。換個(gè)角度理解「Stream就是一系列的Future組合,F(xiàn)uture只能有一個(gè)異步響應(yīng),而Stream就是一系列的異步響應(yīng)」。
//Futures實(shí)現(xiàn)
void main() {
Future.delayed(Duration(seconds: 1), () => print('future value is: 1'));
}
輸出結(jié)果:
//Stream實(shí)現(xiàn)
void main() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (int value) {
return value + 1;
});
await stream.forEach((element) => print('stream value is: $element'));
}
輸出結(jié)果: (輸出結(jié)果是一直在運(yùn)行的)
2. 什么是Stream
用官方的術(shù)語(yǔ)來(lái)說(shuō): 「Stream 是一系列異步事件的序列。其類似于一個(gè)異步的 Iterable,不同的是當(dāng)你向 Iterable 獲取下一個(gè)事件時(shí)它會(huì)立即給你,但是 Stream 則不會(huì)立即給你而是在它準(zhǔn)備好時(shí)告訴你。」 Streams是異步數(shù)據(jù)的源,Stream提供了一種接收事件序列的方式。每個(gè)事件要么是數(shù)據(jù)事件(或稱為流的元素),要么就是用于通知異常信息error事件。當(dāng)Stream所有的事件發(fā)出以后,一個(gè)"done"結(jié)束事件將作為最后一個(gè)事件發(fā)出。實(shí)際上類似RX響應(yīng)式流的概念。
2.1 單一訂閱模型(Single-subscription)

2.2 廣播訂閱模型(Broadcast-subscription)

2.3 模型分析
StreamController是創(chuàng)建Stream對(duì)象主要方式之一每個(gè) StreamController都會(huì)有一個(gè)槽口(Sink), 也就是Stream事件的入口,通過(guò)Sink的add將事件序列加入到StreamController中。StreamController類似一個(gè)生產(chǎn)者和消費(fèi)者模型,它不知道什么時(shí)候會(huì)有事件從Sink槽口加進(jìn)來(lái),而對(duì)于外部訂閱者也不知道何時(shí)有事件出來(lái),所以對(duì)于外部訂閱者只需要添加監(jiān)聽就好了。當(dāng)有事件通過(guò)sink槽口加入到StreamController后,StreamController就開始工作,然后直到它輸出數(shù)據(jù)。 需要注意的是從sink槽口加入的事件序列是有序的,監(jiān)聽器得到序列是和加入序列一致,也就是說(shuō)StreamController處理并不會(huì)打亂事件序列順序。 單一訂閱者顧明思意就是只能有一個(gè)訂閱者監(jiān)聽整個(gè)事件流,而對(duì)于廣播訂閱可以有若干個(gè)訂閱者監(jiān)聽整個(gè)事件流,類似于廣播通知的機(jī)制。
3. 如何使用Stream
3.1 創(chuàng)建Stream的方法
3.1.1 通過(guò)Stream構(gòu)造器創(chuàng)建
「Stream.fromFuture」: 通過(guò)Future創(chuàng)建一個(gè)新的 「single-subscription(單一訂閱)Stream」 , 當(dāng)Future完成時(shí)觸發(fā) then回調(diào),然后就會(huì)把返回的value加入到StreamController中, 并且還會(huì)添加一個(gè)Done事件表示結(jié)束。若Future完成時(shí)觸發(fā)onError回調(diào),則會(huì)把error加入到StreamController中, 并且還會(huì)添加一個(gè)Done事件表示結(jié)束。
factory Stream.fromFuture(Future<T> future) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) {//future完成時(shí),then回調(diào)
controller._add(value);//將value加入到_StreamController中
controller._closeUnchecked();//最后發(fā)送一個(gè)done事件
}, onError: (error, stackTrace) {//future完成時(shí),error回調(diào)
controller._addError(error, stackTrace);//將error加入到_StreamController中
controller._closeUnchecked();//最后發(fā)送一個(gè)done事件
});
return controller.stream;//最后返回stream
}
void main() {
Stream.fromFuture(Future.delayed(Duration(seconds: 1), () => 100)).listen(
(event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);//cancelOnError: true(表示出現(xiàn)error就取消訂閱,之后事件將無(wú)法接收;false表示出現(xiàn)error后,后面事件可以繼續(xù)接收)
}
輸出結(jié)果:
「Stream.fromIterable」: 通過(guò)從一個(gè)集合中獲取其數(shù)據(jù)來(lái)創(chuàng)建一個(gè)新的**single-subscription(單一訂閱)Stream**
void main() {
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8])
.map((event) => "this is $event")//還可以借助map,fold,reduce之類操作符,可以變換事件流
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
輸出結(jié)果:
「Stream.fromFutures」:從一系列的Future中創(chuàng)建一個(gè)新的 「single-subscription(單一訂閱)Stream」,每個(gè)future都有自己的data或者error事件,當(dāng)整個(gè)Futures完成后,流將會(huì)關(guān)閉。如果Futures為空,流將會(huì)立刻關(guān)閉。
void main() {
var future1 = Future.value(100);
var future2 = Future.delayed(Duration(seconds: 1), () => 200);
var future3 = Future.delayed(Duration(seconds: 2), () => 300);
Stream.fromFutures([future1, future2, future3])
.reduce((previous, element) => previous + element)//累加所有future中的值
.asStream()
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
輸出結(jié)果:
「Stream.periodic:」 可以創(chuàng)建一個(gè)新的重復(fù)發(fā)射事件而且可以指定間隔時(shí)間的Stream,通過(guò)再StreamController的onResume方法中創(chuàng)建一個(gè)Timer對(duì)象,最后調(diào)用Timer的 「periodic方法。」
factory Stream.periodic(Duration period,
[T computation(int computationCount)]) {
Timer timer;
int computationCount = 0;
StreamController<T> controller;
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = new Stopwatch();//創(chuàng)建Stopwatch用于計(jì)算Stream運(yùn)行時(shí)間, 會(huì)一直運(yùn)行不會(huì)停止
void sendEvent() {
watch.reset();
T data;
if (computation != null) {
try {
data = computation(computationCount++);
} catch (e, s) {
controller.addError(e, s);
return;
}
}
controller.add(data);
}
void startPeriodicTimer() {
assert(timer == null);
//創(chuàng)建Timer對(duì)象
timer = new Timer.periodic(period, (Timer timer) {
sendEvent();
});
}
controller = new StreamController<T>(
sync: true,
onListen: () {
watch.start();
startPeriodicTimer();
},
onPause: () {
timer.cancel();
timer = null;
watch.stop();
},
onResume: () {
assert(timer == null);
Duration elapsed = watch.elapsed;
watch.start();
timer = new Timer(period - elapsed, () {
timer = null;
startPeriodicTimer();
sendEvent();
});
},
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
void main() {
Stream.periodic(Duration(seconds: 1), (value) => value + 100)
.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
輸出結(jié)果:
3.1.2 通過(guò)StreamController創(chuàng)建
創(chuàng)建任意類型StremController,也就是sink槽口可以加入任何類型的事件數(shù)據(jù)
import 'dart:async';
void main() {
//1.創(chuàng)建一個(gè)任意類型StreamController對(duì)象
StreamController streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通過(guò)sink槽口添加任意類型事件數(shù)據(jù)
streamController.sink.add(100);
streamController.sink.add(100.121212);
streamController.sink.add('THIS IS STRING');
streamController.sink.close();//只有手動(dòng)調(diào)用close方法發(fā)送一個(gè)done事件,onDone才會(huì)被回調(diào)
//3.注冊(cè)監(jiān)聽
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
輸出結(jié)果:
創(chuàng)建指定類型的StreamController, 也就是sink槽口可以加入對(duì)應(yīng)指定類型的事件數(shù)據(jù)
import 'dart:async';
void main() {
//1.創(chuàng)建一個(gè)int類型StreamController對(duì)象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通過(guò)sink槽口添加int類型事件數(shù)據(jù)
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手動(dòng)調(diào)用close方法發(fā)送一個(gè)done事件,onDone才會(huì)被回調(diào)
//3.注冊(cè)監(jiān)聽
streamController.stream.listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
輸出結(jié)果:
3.1.3 通過(guò)async*創(chuàng)建
如果有一系列事件需要處理,也許會(huì)需要把它轉(zhuǎn)化為 stream。這時(shí)候可以使用 「async」*** 和 yield** 來(lái)生成一個(gè) Stream。
void main() {
generateStream(10).listen((event) => print(event),
onDone: () => print('is done'),
onError: (error, stacktrace) => print('is error, errMsg: $error'),
cancelOnError: true);
}
Stream<int> generateStream(int dest) async* {
for (int i = 1; i <= dest; i++) {
yield i;
}
}
輸出結(jié)果:
3.2 監(jiān)聽Stream
3.2.1 listen方法監(jiān)聽
監(jiān)聽Stream流主要就是使用 listen 這個(gè)方法,它有 onData(必填參數(shù)) , onError(可選參數(shù)) , onDone(可選參數(shù)) , cancelOnError(可選參數(shù))
onData: 接收到數(shù)據(jù)時(shí)觸發(fā)回調(diào) onError: 接收到異常時(shí)觸發(fā)回調(diào) onDone: 數(shù)據(jù)接收完畢觸發(fā)回調(diào) cancelOnError: 表示true(出現(xiàn)第一個(gè)error就取消訂閱,之后事件將無(wú)法接收;false表示出現(xiàn)error后,后面事件可以繼續(xù)接收)
StreamSubscription<T> listen(void onData(T data),
{Function onError, void onDone(), bool cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
3.2.2 async-await配合for或forEach循環(huán)處理
通過(guò)async-await配合for或forEach可以實(shí)現(xiàn)當(dāng)Stream中每個(gè)事件到來(lái)的時(shí)候處理它,由于Stream接收事件時(shí)機(jī)是不確定,所以for或forEach循環(huán)退出的時(shí)候一般是Stream關(guān)閉或者完成結(jié)束的時(shí)候
void main() async {
Stream<int> stream = Stream<int>.periodic(Duration(seconds: 1), (int value) {
return value + 1;
});
await stream.forEach((element) => print('stream value is: $element'));
}
輸出結(jié)果:
3.3 Stream流的轉(zhuǎn)換
Stream流的轉(zhuǎn)換實(shí)際上是通過(guò)類似 map 、 take 、 where 、 reduce 、 expand 之類的操作符函數(shù)實(shí)現(xiàn)流的變換。實(shí)際上他們作用和集合中變化操作符意思是類似,所以這里由于篇幅問(wèn)題就不一一展開,有了前面集合操作符函數(shù)基礎(chǔ),這里也是類似,只不過(guò)這邊返回的是 Stream<T> 而已。不過(guò)這里需要特別說(shuō)下 transform 操作符函數(shù).
transform 操作符函數(shù)它能實(shí)現(xiàn)更多自定義的流變化規(guī)則, 它通過(guò)傳入一個(gè) StreamTransformer<T, S> 參數(shù),最后返回一個(gè) Stream<T> . 也就是輸入的流類型是 T . 輸出的是 S ,通過(guò)StreamTransformer 輸出一個(gè)新的Stream流。
Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
import 'dart:async';
void main() {
//1.創(chuàng)建一個(gè)int類型StreamController對(duì)象
StreamController<int> streamController = StreamController();
//2.通過(guò)sink槽口添加int類型事件數(shù)據(jù)
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手動(dòng)調(diào)用close方法發(fā)送一個(gè)done事件,onDone才會(huì)被回調(diào)
//自定義StreamTransformer
final transformer = StreamTransformer<int, String>.fromHandlers(handleData: (value, sink) {
sink.add("this number is: $value");
});
//3.注冊(cè)監(jiān)聽
streamController.stream
.transform(transformer)
.listen((event) => print("second listener: $event"), onDone: () => print('second listener: is done'));
}
輸出結(jié)果:
3.4 Stream流的種類
其實(shí)這里Stream流的種類劃分是基于Stream流的訂閱模型來(lái)劃分,所以那么這里Stream流的種類只有兩種: 「Single-subscription(單一訂閱)Stream、Broadcast-subscription(廣播訂閱)Stream」
3.4.1 Single-subscription(單一訂閱)Stream
單一訂閱Stream在整個(gè)流的生命周期中「只會(huì)有一個(gè)訂閱者監(jiān)聽,「也就是listen方法只能調(diào)用一次,而且」第一次listen取消(cancel)后,不能重復(fù)監(jiān)聽,否則會(huì)拋出異?!?/strong>。
import 'dart:async';
void main() {
//1.創(chuàng)建一個(gè)int類型StreamController對(duì)象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通過(guò)sink槽口添加int類型事件數(shù)據(jù)
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手動(dòng)調(diào)用close方法發(fā)送一個(gè)done事件,onDone才會(huì)被回調(diào)
//3.注冊(cè)監(jiān)聽
streamController.stream.listen((event) => print(event), onDone: () => print('is done'));
streamController.stream.listen((event) => print(event), onDone: () => print('is done')); //不允許兩次監(jiān)聽
}
輸出結(jié)果:
3.4.2 「Broadcast-subscription」(「廣播訂閱」)Stream
Broadcast廣播訂閱模型Stream, 可以同時(shí)存在任意多個(gè)訂閱者監(jiān)聽,無(wú)論是否有訂閱者,它都會(huì)產(chǎn)生事件。所以中途進(jìn)來(lái)的收聽者將「不會(huì)收到」之前的消息。如果多個(gè)收聽者要監(jiān)聽單一訂閱Stream,需要使用 asBroadcastStream 轉(zhuǎn)化成Broadcast廣播訂閱Stream. 或者創(chuàng)建BroadcastStream流可以通過(guò)繼承Stream然后重寫isBroadcast為true即可。
import 'dart:async';
void main() {
//1.創(chuàng)建一個(gè)int類型StreamController對(duì)象
StreamController<int> streamController = StreamController(
onListen: () => print('listen'),
onCancel: () => print('cancel'),
onPause: () => print('pause'),
onResume: () => print('resumr'));
//2.通過(guò)sink槽口添加int類型事件數(shù)據(jù)
streamController.sink.add(100);
streamController.sink.add(200);
streamController.sink.add(300);
streamController.sink.add(400);
streamController.sink.add(500);
streamController.sink.close(); //只有手動(dòng)調(diào)用close方法發(fā)送一個(gè)done事件,onDone才會(huì)被回調(diào)
//3.注冊(cè)監(jiān)聽
Stream stream = streamController.stream.asBroadcastStream();//轉(zhuǎn)換成BroadcastStream
stream.listen((event) => print("first listener: $event"), onDone: () => print('first listener: is done'));
stream.listen((event) => print("second listener: $event"), onDone: () => print('second listener: is done'));
}
輸出結(jié)果:
4. Stream使用的場(chǎng)景
Stream的使用場(chǎng)景有很多比如數(shù)據(jù)庫(kù)的讀寫、文件IO的讀寫、基于多個(gè)網(wǎng)絡(luò)請(qǐng)求轉(zhuǎn)化處理都可以使用流來(lái)處理。下面會(huì)給出一個(gè)具體的文件復(fù)制例子實(shí)現(xiàn)IO文件讀寫使用Stream的場(chǎng)景。
import 'dart:async';
import 'dart:io';
void main() {
copyFile(File('/Users/mikyou/Desktop/gitchat/test.zip'),
File('/Users/mikyou/Desktop/gitchat/copy_dir/test.copy.zip'));
}
void copyFile(File sourceFile, File targetFile) async {
assert(await sourceFile.exists() == true);
print('source file path: ${sourceFile.path}');
print('target file path: ${targetFile.path}');
//以WRITE方式打開文件,創(chuàng)建緩存IOSink
IOSink sink = targetFile.openWrite();
//文件大小
int fileLength = await sourceFile.length();
//已讀取文件大小
int count = 0;
//模擬進(jìn)度條
String progress = "-";
//以只讀方式打開源文件數(shù)據(jù)流
Stream<List<int>> inputStream = sourceFile.openRead();
inputStream.listen((List<int> data) {
count += data.length;
//進(jìn)度百分比
double num = (count * 100) / fileLength;
print("${progress * (num ~/ 2)}[${num.toStringAsFixed(2)}%]");
//將數(shù)據(jù)添加到緩存sink中
sink.add(data);
}, onDone: () {
//數(shù)據(jù)流傳輸結(jié)束時(shí),觸發(fā)onDone事件
print("復(fù)制文件結(jié)束!");
//關(guān)閉緩存釋放系統(tǒng)資源
sink.close();
});
}
輸出結(jié)果:
5. Stream與Future的區(qū)別
實(shí)際上有了上面對(duì)Stream的介紹,相信很多人基本上都能分析出Stream和Future的區(qū)別了。先用官方專業(yè)術(shù)語(yǔ)做下對(duì)比:「Future 表示一個(gè)不會(huì)立即完成的計(jì)算過(guò)程」。與普通函數(shù)直接返回結(jié)果不同的是異步函數(shù)返回一個(gè)將會(huì)包含結(jié)果的 Future。該 Future 會(huì)在結(jié)果準(zhǔn)備好時(shí)通知調(diào)用者
「Stream 是一系列異步事件的序列」。其類似于一個(gè)異步的 Iterable,不同的是當(dāng)你向 Iterable 獲取下一個(gè)事件時(shí)它會(huì)立即給你,但是 Stream 則不會(huì)立即給你而是在它準(zhǔn)備好時(shí)告訴你
可以使用一個(gè)餐廳吃飯場(chǎng)景來(lái)理解Future和Stream的區(qū)別:
「Future」就好比你去一家餐廳吃飯,在前臺(tái)點(diǎn)好你想吃的菜后,付完錢后服務(wù)員會(huì)給你一個(gè)等待的號(hào)碼牌(相當(dāng)于先拿到一個(gè)Future),后廚就開始根據(jù)你下的訂單開始做菜,等到你的菜好了后,就可以通過(guò)號(hào)碼牌拿到指定的菜了(返回的數(shù)據(jù)或異常信息)。「Stream」就好比去一家餐廳吃飯,在前臺(tái)點(diǎn)好A,B,C,D4種你想吃的菜后(訂閱數(shù)據(jù)流過(guò)程),然后你就去桌子等著,至于菜什么時(shí)候好,你也不知道所以就一直等著(類似于一直監(jiān)聽listen著),后廚就開始根據(jù)你下的訂單開始做菜, 等著你的第一盤A種菜好了后,服務(wù)員就會(huì)主動(dòng)傳送A到你的桌子上(基于一種類似訂閱-推送機(jī)制),沒(méi)有特殊意外,服務(wù)員推送菜的順序應(yīng)該也是A,B,C,D。
6. 熊喵先生的小總結(jié)
到這里有關(guān)Dart異步編程中Stream就介紹完畢了,Stream在數(shù)據(jù)庫(kù)讀寫,文件IO讀寫方面是非常實(shí)用。此外它支持響應(yīng)式流式編程,可以利用它一些轉(zhuǎn)換操作符實(shí)現(xiàn)對(duì)流的變換,可以實(shí)現(xiàn)一些類似Rx中的操作。
感謝關(guān)注,熊喵先生愿和你在技術(shù)路上一起成長(zhǎng)!
