Skip to content

Instantly share code, notes, and snippets.

@PlugFox
Last active August 18, 2022 23:27
Show Gist options
  • Save PlugFox/459637909b221b65ab32f6e01a67b7e7 to your computer and use it in GitHub Desktop.
Save PlugFox/459637909b221b65ab32f6e01a67b7e7 to your computer and use it in GitHub Desktop.
ValueStream
class ValueStream<T> extends Stream<T> {
Stream<T> _source;
T _lastValue;
T get value => _lastValue;
Stream<T> get _stream async* {
yield value;
yield* _source;
}
ValueStream.fromStream(Stream<T> stream, [T lastValue])
: assert(stream.isBroadcast, 'stream must be broadcast') {
_lastValue = lastValue;
StreamSubscription<T> sub;
final sc =
StreamController<T>.broadcast(
onCancel: () => sub.cancel(),
sync: true,
) as SynchronousStreamController<T>;
sub = stream.listen(
(v) {
_lastValue = v;
sc.add(v);
},
onDone: sc.close,
onError: sc.addError,
cancelOnError: true,
);
_source = sc.stream;
}
@override
StreamSubscription<T> listen(void Function(T event) onData,
{Function onError, void Function() onDone, bool cancelOnError}) =>
_stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment