Skip to content

Instantly share code, notes, and snippets.

@beeth0ven
Last active Nov 13, 2021
Embed
What would you like to do?
NaiveStream for dart
import 'dart:collection';
void main() async {
await streamMultiSync();
await streamMultiAsync();
await streamMultiSyncCancel();
await streamMultiAsyncCancel();
await streamMultiOn();
await streamMultiMap();
await streamMultiWhere();
await streamMultiAsyncBroadcast();
await streamMultiAsyncOnPause();
}
Future<void> streamMultiSync() async {
print('-- stream.multi.sync --');
final stream = streamMulti<int>((listener) {
listener.onData(1);
listener.onData(2);
listener.onData(3);
Future<void>.delayed(const Duration(milliseconds: 100), () {
listener.onData(4);
});
});
stream.listen((value) {
print('next: $value');
});
print('listened');
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
next: 1
next: 2
next: 3
listened
next: 4
*/
}
Future<void> streamMultiAsync() async {
print('-- stream.multi.async --');
final stream = streamMulti<int>((listener) {
listener.onData(1);
listener.onData(2);
listener.onData(3);
Future<void>.delayed(const Duration(milliseconds: 100), () {
listener.onData(4);
});
})
.async(); // <- add this
stream.listen((value) {
print('next: $value');
});
print('listened');
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
listened
next: 1
next: 2
next: 3
next: 4
*/
}
NaiveStream<int> _exampleStream() => streamMulti((listener) {
listener.onData(1);
listener.onData(2);
listener.onData(3);
Future<void>.delayed(const Duration(milliseconds: 100), () {
listener.onData(4);
});
return SubscriptionAddon(
cancel: () async => print('cancel'), // <- add cancel logic
);
});
Future<void> streamMultiSyncCancel() async {
print('-- stream.multi.sync.cancel --');
final stream = _exampleStream(); // <- example stream with cancel logic
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await subscription.cancel();
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
next: 1
next: 2
next: 3
listened
cancel
*/
}
Future<void> streamMultiAsyncCancel() async {
print('-- stream.multi.async.cancel --');
final stream = _exampleStream()
.async(); // to async stream
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await subscription.cancel();
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
listened
cancel
*/
}
Future<void> streamMultiOn() async {
print('-- stream.multi.on --');
final stream = _exampleStream()
.on( // <- register more handlers
data: (value) => print('on data $value'),
cancel: () async => print('on cancel'),
);
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await subscription.cancel();
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
next: 1
on data 1
next: 2
on data 2
next: 3
on data 3
listened
on cancel
cancel
*/
}
Future<void> streamMultiMap() async {
print('-- stream.multi.map --');
final stream = _exampleStream()
.map((it) => it + 10); // map stream by add 10
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await subscription.cancel();
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
next: 11
next: 12
next: 13
listened
cancel
*/
}
Future<void> streamMultiWhere() async {
print('-- stream.multi.where --');
final stream = _exampleStream()
.where((it) => it > 0); // filter stream value > 0
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await subscription.cancel();
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
next: 1
next: 2
next: 3
listened
cancel
*/
}
Future<void> streamMultiAsyncBroadcast() async {
print('-- stream.multi.async.broadcast --');
final stream = _exampleStream()
.async()
.asBroadcast(); // to async broadcast stream
final subscription1 = stream.listen((value) {
print('next1: $value');
});
final subscription2 = stream.listen((value) {
print('next2: $value');
});
print('listened');
await Future<void>.delayed(const Duration(milliseconds: 50));
await subscription1.cancel();
await subscription2.cancel(); // source subscription canceled
// when all child subscriptions canceled
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
listened
next1: 1
next2: 1
next1: 2
next2: 2
next1: 3
next2: 3
cancel
*/
}
Future<void> streamMultiAsyncOnPause() async {
print('-- stream.multi.async.on.pause --');
final stream = _exampleStream()
.async() // to async stream
.on(
pause: () => print('on pause'),
resume: () => print('on resume'),
);
final subscription = stream.listen((value) {
print('next: $value');
});
print('listened');
await Future<void>.delayed(const Duration(milliseconds: 50));
subscription.pause(); // steps: 1, 2, 3, {pause here}, 4
await Future<void>.delayed(const Duration(milliseconds: 100));
subscription.resume(); // steps: 1, 2, 3, {pause here}, 4, {resume here}
await Future<void>.delayed(const Duration(milliseconds: 100));
await subscription.cancel(); // steps: 1, 2, 3, {pause here}, 4, {resume here}, {cancel here}
await Future<void>.delayed(const Duration(milliseconds: 1000));
/** will prints
listened
next: 1
next: 2
next: 3
on pause
on resume
next: 4
cancel
*/
}
/// Bellow are implementation detail
class Listener<T> {
Listener(this.onData);
final void Function(T value) onData;
// final void Function(Object error) onError; // we ignore error and completed for simplicity,
// final void Function() onCompleted; // since cancel on error or completed feature
// is too complex for this demo
}
typedef Cancel = Future<void> Function();
typedef VoidCallback = void Function();
typedef Getter<T> = T Function();
class Subscription {
Subscription({
required this.cancel,
required this.pause,
required this.resume,
required this.isPaused,
});
final Cancel cancel;
final VoidCallback pause;
final VoidCallback resume;
final Getter<bool> isPaused;
// final OnData<T> onData; // I prefer not mutate handler after listened,
// final OnError onError; // if we need to register handler after listened,
// final OnDone onDone; // I'd like to transform source stream to broadcast one,
// and listen to the broadcast stream again
// to register new handler
Subscription copyWith({
Cancel? cancel,
VoidCallback? pause,
VoidCallback? resume,
Getter<bool>? isPaused,
}) => Subscription(
cancel: cancel ?? this.cancel,
pause: pause ?? this.pause,
resume: resume ?? this.resume,
isPaused: isPaused ?? this.isPaused,
);
}
typedef Listen<T> = Subscription Function(Listener<T> listener);
/// This is our `Stream`
class NaiveStream<T> {
NaiveStream(this._listen);
final Listen<T> _listen;
Subscription listen(void Function(T value) onData) {
bool isDisposed = false;
final listener = Listener<T>((value) {
if (isDisposed) return;
onData(value);
});
final subscription = _listen(listener);
return subscription.copyWith(
cancel: () async {
if (isDisposed) return;
isDisposed = true;
await subscription.cancel();
}
);
}
}
/// An enhanced listener which can be pauseable
class PauseableListener<T> {
PauseableListener(this._listener);
final Listener<T> _listener;
bool _isDisposed = false;
bool _isPaused = false;
final Queue<T> _buffer = Queue();
bool get isPaused => _isPaused;
void pause() {
if (_isDisposed) return; // or throw StateError
if (_isPaused) return;
_isPaused = true;
}
void resume() {
if (_isDisposed) return; // or throw StateError
if (!_isPaused) return;
_isPaused = false;
while (!_isPaused && _buffer.isNotEmpty) {
if (_isDisposed) return; // or throw StateError
final value = _buffer.removeFirst();
_listener.onData(value);
}
}
void onData(T value) {
if (_isDisposed) return; // or throw StateError
if (_isPaused) {
_buffer.add(value);
} else {
_listener.onData(value);
}
}
void dispose() {
if (_isDisposed) return;
_isDisposed = true;
_buffer.clear();
}
/// Turn pauseableListener to normal listener
Listener<T> asListener() => Listener(onData);
}
extension ListenerAsPauseableX<T> on Listener<T> {
/// Turn normal listener to pauseableListener
PauseableListener<T> asPauseable() => PauseableListener(this);
}
class SubscriptionAddon {
SubscriptionAddon({
this.cancel,
this.pause,
this.resume,
this.isPaused,
});
final Cancel? cancel;
final VoidCallback? pause;
final VoidCallback? resume;
final Getter<bool>? isPaused;
}
extension SubscriptionAddWithX on Subscription {
/// Extend current subscription
Subscription addWith(SubscriptionAddon? addon) {
if (addon == null) return this;
return copyWith(
cancel: addon.cancel == null ? cancel : () async {
await Future.wait([
addon.cancel!(),
cancel()
]);
},
pause: addon.pause == null ? pause : () {
addon.pause!();
pause();
},
resume: addon.resume == null ? resume : () {
resume();
addon.resume!();
},
isPaused: addon.isPaused == null ? isPaused : addon.isPaused!,
);
}
}
/// This implement `Stream.multi`
NaiveStream<T> streamMulti<T>(SubscriptionAddon? Function(Listener listener) listen) {
return NaiveStream((listener) {
final pauseableListener = listener.asPauseable();
final subscriptionAddon = listen(pauseableListener.asListener());
final subscription = Subscription(
cancel: () async => pauseableListener.dispose(),
pause: pauseableListener.pause,
resume: pauseableListener.resume,
isPaused: () => pauseableListener.isPaused,
);
return subscription.addWith(subscriptionAddon);
});
}
extension StreamAsyncAddX<T> on NaiveStream<T> {
/// Turn a sync stream to async stream
NaiveStream<T> async() {
final _listen = this._listen;
return NaiveStream((listener) {
final asyncListener = Listener<T> ((value) {
Future(() => listener.onData(value));
});
return _listen(asyncListener);
});
}
}
extension StreamAsBroadcastX<T> on NaiveStream<T> {
// Share source of truth to multiple listeners
NaiveStream<T> asBroadcast() {
final _listen = this._listen;
final listeners = <Listener<T>>[];
int listenersCount = 0;
Subscription? sourceSubscription;
final outerListener = Listener<T>((value) {
for (final listener in listeners) {
listener.onData(value);
}
});
return NaiveStream((listener) {
final pauseableListener = listener.asPauseable();
final _listener = pauseableListener.asListener();
listeners.add(_listener);
listenersCount += 1;
if (listenersCount == 1) {
sourceSubscription = _listen(outerListener);
}
Future<void> cancel() async {
if (listeners.contains(_listener)) {
pauseableListener.dispose();
listeners.remove(_listener);
listenersCount -= 1;
if (listenersCount == 0) {
await sourceSubscription?.cancel();
sourceSubscription = null;
}
}
}
return Subscription(
cancel: cancel,
pause: pauseableListener.pause,
resume: pauseableListener.resume,
isPaused: () => pauseableListener.isPaused,
);
});
}
}
extension StreamOnX<T> on NaiveStream<T> {
/// Register event handlers
NaiveStream<T> on({
void Function(T value)? data,
Cancel? cancel,
VoidCallback? pause,
VoidCallback? resume,
}) {
final _listen = this._listen;
return NaiveStream((listener) {
final _listener = data == null ? listener : Listener<T>((value) {
listener.onData(value);
data(value);
});
final subscription = _listen(_listener);
final subscriptionAddon = SubscriptionAddon(
cancel: cancel,
pause: pause,
resume: resume,
);
return subscription.addWith(subscriptionAddon);
});
}
}
extension StreamBasicOperatorX<T> on NaiveStream<T> {
/// Known as `stream.map`
NaiveStream<R> map<R>(R Function(T) convert) {
final _listen = this._listen;
return NaiveStream((listener) {
final mappedListener = Listener<T>((value) {
final mappedValue = convert(value);
listener.onData(mappedValue);
});
return _listen(mappedListener);
});
}
/// Known as `stream.where`
NaiveStream<T> where(bool Function(T) test) {
final _listen = this._listen;
return NaiveStream((listener) {
final mappedListener = Listener<T>((value) {
final shouldAdd = test(value);
if (shouldAdd) {
listener.onData(value);
}
});
return _listen(mappedListener);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment