Skip to content

Instantly share code, notes, and snippets.

@rxlabz
Created December 7, 2016 17:13
Show Gist options
  • Save rxlabz/04e7d62acebb9005fcfc6d8c2c95499b to your computer and use it in GitHub Desktop.
Save rxlabz/04e7d62acebb9005fcfc6d8c2c95499b to your computer and use it in GitHub Desktop.
Dart streamTransformer : - fromHandlers - implementing StreamTransformer
import 'dart:async';
void main() {
var st = new StreamTransformer.fromHandlers();
var s = new Stream.fromIterable([1, 2, 3])
..transform(new StreamTransformer.fromHandlers(handleData: doublr))
.listen((v) => print('main v ${v}'));
var s2 = new Stream.fromIterable([1, 2, 3])
..transform(new Reducer.broadcast()).listen((v) => print('R ${v}'));
}
void doublr(data, EventSink sink) {
sink.add(data * 2);
}
class Reducer<S extends num, T extends num> implements StreamTransformer<S, T> {
T total;
StreamController<T> _controller;
bool cancelOnError;
StreamSubscription _subscription;
Stream<S> _stream;
Reducer({bool sync: false, this.cancelOnError}) {
_controller = new StreamController<T>(
onListen: _onListen,
onCancel: _onCancel,
onPause: () {
_subscription.pause();
},
onResume: () {
_subscription.resume();
},
sync: sync);
}
Reducer.broadcast({bool sync: false, this.cancelOnError}) {
_controller = new StreamController<T>.broadcast(
onListen: _onListen, onCancel: _onCancel, sync: sync);
}
void _onListen() {
_subscription = _stream.listen(onData,
onError: _controller.addError,
onDone: _controller.close,
cancelOnError: cancelOnError);
}
void _onCancel() {
_subscription.cancel();
_subscription = null;
}
void onData(S data) {
total = (total != null ? total : 0) + (data as int) as T;
_controller.add(total);
}
@override
Stream<T> bind(Stream<S> stream) {
this._stream = stream;
return _controller.stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment