Created
December 7, 2016 17:13
-
-
Save rxlabz/04e7d62acebb9005fcfc6d8c2c95499b to your computer and use it in GitHub Desktop.
Dart streamTransformer : - fromHandlers
- implementing StreamTransformer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
void main() { | |
var st = new StreamTransformer.fromHandlers(); | |
var s = new Stream.fromIterable([1, 2, 3]) | |
..transform(new StreamTransformer.fromHandlers(handleData: doublr)) | |
.listen((v) => print('main v ${v}')); | |
var s2 = new Stream.fromIterable([1, 2, 3]) | |
..transform(new Reducer.broadcast()).listen((v) => print('R ${v}')); | |
} | |
void doublr(data, EventSink sink) { | |
sink.add(data * 2); | |
} | |
class Reducer<S extends num, T extends num> implements StreamTransformer<S, T> { | |
T total; | |
StreamController<T> _controller; | |
bool cancelOnError; | |
StreamSubscription _subscription; | |
Stream<S> _stream; | |
Reducer({bool sync: false, this.cancelOnError}) { | |
_controller = new StreamController<T>( | |
onListen: _onListen, | |
onCancel: _onCancel, | |
onPause: () { | |
_subscription.pause(); | |
}, | |
onResume: () { | |
_subscription.resume(); | |
}, | |
sync: sync); | |
} | |
Reducer.broadcast({bool sync: false, this.cancelOnError}) { | |
_controller = new StreamController<T>.broadcast( | |
onListen: _onListen, onCancel: _onCancel, sync: sync); | |
} | |
void _onListen() { | |
_subscription = _stream.listen(onData, | |
onError: _controller.addError, | |
onDone: _controller.close, | |
cancelOnError: cancelOnError); | |
} | |
void _onCancel() { | |
_subscription.cancel(); | |
_subscription = null; | |
} | |
void onData(S data) { | |
total = (total != null ? total : 0) + (data as int) as T; | |
_controller.add(total); | |
} | |
@override | |
Stream<T> bind(Stream<S> stream) { | |
this._stream = stream; | |
return _controller.stream; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment