NaiveStream for dart
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:collection'; | |
void main() async { | |
await streamMultiSync(); | |
await streamMultiAsync(); | |
await streamMultiSyncCancel(); | |
await streamMultiAsyncCancel(); | |
await streamMultiOn(); | |
await streamMultiMap(); | |
await streamMultiWhere(); | |
await streamMultiAsyncBroadcast(); | |
await streamMultiAsyncOnPause(); | |
} | |
Future<void> streamMultiSync() async { | |
print('-- stream.multi.sync --'); | |
final stream = streamMulti<int>((listener) { | |
listener.onData(1); | |
listener.onData(2); | |
listener.onData(3); | |
Future<void>.delayed(const Duration(milliseconds: 100), () { | |
listener.onData(4); | |
}); | |
}); | |
stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
next: 1 | |
next: 2 | |
next: 3 | |
listened | |
next: 4 | |
*/ | |
} | |
Future<void> streamMultiAsync() async { | |
print('-- stream.multi.async --'); | |
final stream = streamMulti<int>((listener) { | |
listener.onData(1); | |
listener.onData(2); | |
listener.onData(3); | |
Future<void>.delayed(const Duration(milliseconds: 100), () { | |
listener.onData(4); | |
}); | |
}) | |
.async(); // <- add this | |
stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
listened | |
next: 1 | |
next: 2 | |
next: 3 | |
next: 4 | |
*/ | |
} | |
NaiveStream<int> _exampleStream() => streamMulti((listener) { | |
listener.onData(1); | |
listener.onData(2); | |
listener.onData(3); | |
Future<void>.delayed(const Duration(milliseconds: 100), () { | |
listener.onData(4); | |
}); | |
return SubscriptionAddon( | |
cancel: () async => print('cancel'), // <- add cancel logic | |
); | |
}); | |
Future<void> streamMultiSyncCancel() async { | |
print('-- stream.multi.sync.cancel --'); | |
final stream = _exampleStream(); // <- example stream with cancel logic | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await subscription.cancel(); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
next: 1 | |
next: 2 | |
next: 3 | |
listened | |
cancel | |
*/ | |
} | |
Future<void> streamMultiAsyncCancel() async { | |
print('-- stream.multi.async.cancel --'); | |
final stream = _exampleStream() | |
.async(); // to async stream | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await subscription.cancel(); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
listened | |
cancel | |
*/ | |
} | |
Future<void> streamMultiOn() async { | |
print('-- stream.multi.on --'); | |
final stream = _exampleStream() | |
.on( // <- register more handlers | |
data: (value) => print('on data $value'), | |
cancel: () async => print('on cancel'), | |
); | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await subscription.cancel(); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
next: 1 | |
on data 1 | |
next: 2 | |
on data 2 | |
next: 3 | |
on data 3 | |
listened | |
on cancel | |
cancel | |
*/ | |
} | |
Future<void> streamMultiMap() async { | |
print('-- stream.multi.map --'); | |
final stream = _exampleStream() | |
.map((it) => it + 10); // map stream by add 10 | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await subscription.cancel(); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
next: 11 | |
next: 12 | |
next: 13 | |
listened | |
cancel | |
*/ | |
} | |
Future<void> streamMultiWhere() async { | |
print('-- stream.multi.where --'); | |
final stream = _exampleStream() | |
.where((it) => it > 0); // filter stream value > 0 | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await subscription.cancel(); | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
next: 1 | |
next: 2 | |
next: 3 | |
listened | |
cancel | |
*/ | |
} | |
Future<void> streamMultiAsyncBroadcast() async { | |
print('-- stream.multi.async.broadcast --'); | |
final stream = _exampleStream() | |
.async() | |
.asBroadcast(); // to async broadcast stream | |
final subscription1 = stream.listen((value) { | |
print('next1: $value'); | |
}); | |
final subscription2 = stream.listen((value) { | |
print('next2: $value'); | |
}); | |
print('listened'); | |
await Future<void>.delayed(const Duration(milliseconds: 50)); | |
await subscription1.cancel(); | |
await subscription2.cancel(); // source subscription canceled | |
// when all child subscriptions canceled | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
listened | |
next1: 1 | |
next2: 1 | |
next1: 2 | |
next2: 2 | |
next1: 3 | |
next2: 3 | |
cancel | |
*/ | |
} | |
Future<void> streamMultiAsyncOnPause() async { | |
print('-- stream.multi.async.on.pause --'); | |
final stream = _exampleStream() | |
.async() // to async stream | |
.on( | |
pause: () => print('on pause'), | |
resume: () => print('on resume'), | |
); | |
final subscription = stream.listen((value) { | |
print('next: $value'); | |
}); | |
print('listened'); | |
await Future<void>.delayed(const Duration(milliseconds: 50)); | |
subscription.pause(); // steps: 1, 2, 3, {pause here}, 4 | |
await Future<void>.delayed(const Duration(milliseconds: 100)); | |
subscription.resume(); // steps: 1, 2, 3, {pause here}, 4, {resume here} | |
await Future<void>.delayed(const Duration(milliseconds: 100)); | |
await subscription.cancel(); // steps: 1, 2, 3, {pause here}, 4, {resume here}, {cancel here} | |
await Future<void>.delayed(const Duration(milliseconds: 1000)); | |
/** will prints | |
listened | |
next: 1 | |
next: 2 | |
next: 3 | |
on pause | |
on resume | |
next: 4 | |
cancel | |
*/ | |
} | |
/// Bellow are implementation detail | |
class Listener<T> { | |
Listener(this.onData); | |
final void Function(T value) onData; | |
// final void Function(Object error) onError; // we ignore error and completed for simplicity, | |
// final void Function() onCompleted; // since cancel on error or completed feature | |
// is too complex for this demo | |
} | |
typedef Cancel = Future<void> Function(); | |
typedef VoidCallback = void Function(); | |
typedef Getter<T> = T Function(); | |
class Subscription { | |
Subscription({ | |
required this.cancel, | |
required this.pause, | |
required this.resume, | |
required this.isPaused, | |
}); | |
final Cancel cancel; | |
final VoidCallback pause; | |
final VoidCallback resume; | |
final Getter<bool> isPaused; | |
// final OnData<T> onData; // I prefer not mutate handler after listened, | |
// final OnError onError; // if we need to register handler after listened, | |
// final OnDone onDone; // I'd like to transform source stream to broadcast one, | |
// and listen to the broadcast stream again | |
// to register new handler | |
Subscription copyWith({ | |
Cancel? cancel, | |
VoidCallback? pause, | |
VoidCallback? resume, | |
Getter<bool>? isPaused, | |
}) => Subscription( | |
cancel: cancel ?? this.cancel, | |
pause: pause ?? this.pause, | |
resume: resume ?? this.resume, | |
isPaused: isPaused ?? this.isPaused, | |
); | |
} | |
typedef Listen<T> = Subscription Function(Listener<T> listener); | |
/// This is our `Stream` | |
class NaiveStream<T> { | |
NaiveStream(this._listen); | |
final Listen<T> _listen; | |
Subscription listen(void Function(T value) onData) { | |
bool isDisposed = false; | |
final listener = Listener<T>((value) { | |
if (isDisposed) return; | |
onData(value); | |
}); | |
final subscription = _listen(listener); | |
return subscription.copyWith( | |
cancel: () async { | |
if (isDisposed) return; | |
isDisposed = true; | |
await subscription.cancel(); | |
} | |
); | |
} | |
} | |
/// An enhanced listener which can be pauseable | |
class PauseableListener<T> { | |
PauseableListener(this._listener); | |
final Listener<T> _listener; | |
bool _isDisposed = false; | |
bool _isPaused = false; | |
final Queue<T> _buffer = Queue(); | |
bool get isPaused => _isPaused; | |
void pause() { | |
if (_isDisposed) return; // or throw StateError | |
if (_isPaused) return; | |
_isPaused = true; | |
} | |
void resume() { | |
if (_isDisposed) return; // or throw StateError | |
if (!_isPaused) return; | |
_isPaused = false; | |
while (!_isPaused && _buffer.isNotEmpty) { | |
if (_isDisposed) return; // or throw StateError | |
final value = _buffer.removeFirst(); | |
_listener.onData(value); | |
} | |
} | |
void onData(T value) { | |
if (_isDisposed) return; // or throw StateError | |
if (_isPaused) { | |
_buffer.add(value); | |
} else { | |
_listener.onData(value); | |
} | |
} | |
void dispose() { | |
if (_isDisposed) return; | |
_isDisposed = true; | |
_buffer.clear(); | |
} | |
/// Turn pauseableListener to normal listener | |
Listener<T> asListener() => Listener(onData); | |
} | |
extension ListenerAsPauseableX<T> on Listener<T> { | |
/// Turn normal listener to pauseableListener | |
PauseableListener<T> asPauseable() => PauseableListener(this); | |
} | |
class SubscriptionAddon { | |
SubscriptionAddon({ | |
this.cancel, | |
this.pause, | |
this.resume, | |
this.isPaused, | |
}); | |
final Cancel? cancel; | |
final VoidCallback? pause; | |
final VoidCallback? resume; | |
final Getter<bool>? isPaused; | |
} | |
extension SubscriptionAddWithX on Subscription { | |
/// Extend current subscription | |
Subscription addWith(SubscriptionAddon? addon) { | |
if (addon == null) return this; | |
return copyWith( | |
cancel: addon.cancel == null ? cancel : () async { | |
await Future.wait([ | |
addon.cancel!(), | |
cancel() | |
]); | |
}, | |
pause: addon.pause == null ? pause : () { | |
addon.pause!(); | |
pause(); | |
}, | |
resume: addon.resume == null ? resume : () { | |
resume(); | |
addon.resume!(); | |
}, | |
isPaused: addon.isPaused == null ? isPaused : addon.isPaused!, | |
); | |
} | |
} | |
/// This implement `Stream.multi` | |
NaiveStream<T> streamMulti<T>(SubscriptionAddon? Function(Listener listener) listen) { | |
return NaiveStream((listener) { | |
final pauseableListener = listener.asPauseable(); | |
final subscriptionAddon = listen(pauseableListener.asListener()); | |
final subscription = Subscription( | |
cancel: () async => pauseableListener.dispose(), | |
pause: pauseableListener.pause, | |
resume: pauseableListener.resume, | |
isPaused: () => pauseableListener.isPaused, | |
); | |
return subscription.addWith(subscriptionAddon); | |
}); | |
} | |
extension StreamAsyncAddX<T> on NaiveStream<T> { | |
/// Turn a sync stream to async stream | |
NaiveStream<T> async() { | |
final _listen = this._listen; | |
return NaiveStream((listener) { | |
final asyncListener = Listener<T> ((value) { | |
Future(() => listener.onData(value)); | |
}); | |
return _listen(asyncListener); | |
}); | |
} | |
} | |
extension StreamAsBroadcastX<T> on NaiveStream<T> { | |
// Share source of truth to multiple listeners | |
NaiveStream<T> asBroadcast() { | |
final _listen = this._listen; | |
final listeners = <Listener<T>>[]; | |
int listenersCount = 0; | |
Subscription? sourceSubscription; | |
final outerListener = Listener<T>((value) { | |
for (final listener in listeners) { | |
listener.onData(value); | |
} | |
}); | |
return NaiveStream((listener) { | |
final pauseableListener = listener.asPauseable(); | |
final _listener = pauseableListener.asListener(); | |
listeners.add(_listener); | |
listenersCount += 1; | |
if (listenersCount == 1) { | |
sourceSubscription = _listen(outerListener); | |
} | |
Future<void> cancel() async { | |
if (listeners.contains(_listener)) { | |
pauseableListener.dispose(); | |
listeners.remove(_listener); | |
listenersCount -= 1; | |
if (listenersCount == 0) { | |
await sourceSubscription?.cancel(); | |
sourceSubscription = null; | |
} | |
} | |
} | |
return Subscription( | |
cancel: cancel, | |
pause: pauseableListener.pause, | |
resume: pauseableListener.resume, | |
isPaused: () => pauseableListener.isPaused, | |
); | |
}); | |
} | |
} | |
extension StreamOnX<T> on NaiveStream<T> { | |
/// Register event handlers | |
NaiveStream<T> on({ | |
void Function(T value)? data, | |
Cancel? cancel, | |
VoidCallback? pause, | |
VoidCallback? resume, | |
}) { | |
final _listen = this._listen; | |
return NaiveStream((listener) { | |
final _listener = data == null ? listener : Listener<T>((value) { | |
listener.onData(value); | |
data(value); | |
}); | |
final subscription = _listen(_listener); | |
final subscriptionAddon = SubscriptionAddon( | |
cancel: cancel, | |
pause: pause, | |
resume: resume, | |
); | |
return subscription.addWith(subscriptionAddon); | |
}); | |
} | |
} | |
extension StreamBasicOperatorX<T> on NaiveStream<T> { | |
/// Known as `stream.map` | |
NaiveStream<R> map<R>(R Function(T) convert) { | |
final _listen = this._listen; | |
return NaiveStream((listener) { | |
final mappedListener = Listener<T>((value) { | |
final mappedValue = convert(value); | |
listener.onData(mappedValue); | |
}); | |
return _listen(mappedListener); | |
}); | |
} | |
/// Known as `stream.where` | |
NaiveStream<T> where(bool Function(T) test) { | |
final _listen = this._listen; | |
return NaiveStream((listener) { | |
final mappedListener = Listener<T>((value) { | |
final shouldAdd = test(value); | |
if (shouldAdd) { | |
listener.onData(value); | |
} | |
}); | |
return _listen(mappedListener); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment