Skip to content

Instantly share code, notes, and snippets.

@kikuchy
Last active November 15, 2021 17:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kikuchy/d96a45d0b22b0baa437ccca69ad6679b to your computer and use it in GitHub Desktop.
Save kikuchy/d96a45d0b22b0baa437ccca69ad6679b to your computer and use it in GitHub Desktop.
Just a little study for making the function like `combineLatest2` of RxDart
// https://gist.github.com/kikuchy/d96a45d0b22b0baa437ccca69ad6679b
import 'dart:async';
typedef Combinator<T1, T2, R> = R Function(T1, T2);
Stream<R> combineLatestN<R>(
List<Stream<dynamic>> streams, R Function(List<dynamic>) combinator) {
final combined = StreamController<R>();
final subscriptions = <StreamSubscription>{};
final completed = <Object>{};
final values = List<dynamic>.filled(streams.length, null);
final received = <Object>{};
void Function(dynamic) onDataFor(Stream<dynamic> key) {
final index = streams.indexOf(key);
return (v) {
values[index] = v;
received.add(key);
if (received.length == streams.length) {
combined.add(combinator(values));
}
};
}
void Function() onDoneFor(Object key) {
return () {
completed.add(key);
if (completed.length == streams.length) {
combined.close();
}
};
}
subscriptions.addAll(streams.map((s) => s.listen(
onDataFor(s),
onError: combined.addError,
onDone: onDoneFor(s),
)));
combined.onCancel =
() async => Future.wait(subscriptions.map((s) => s.cancel()));
return combined.stream;
}
Stream<R> combineLatest2<T1, T2, R>(
Stream<T1> s1, Stream<T2> s2, Combinator<T1, T2, R> combinator) {
return combineLatestN<R>([s1, s2], (values) {
return combinator(values.first as T1, values.last as T2);
});
}
void main() async {
final Stream<int> two = Stream.value(2).asBroadcastStream();
final Stream<int> a = Stream.value(1);
combineLatest2<int, int, String>(a, two, (x, y) => "${x + y}").listen(
print,
onError: (e, s) {
print(e);
print(s);
},
onDone: () => print("a Done!"),
);
final Stream<int> b = Stream.fromIterable([10, 20, 30, 40]);
combineLatest2<int, int, String>(b, two, (x, y) => "${x + y}").listen(
print,
onError: (e, s) {
print(e);
print(s);
},
onDone: () => print("b Done!"),
);
final Stream<int> c = Stream.error(Error(), StackTrace.current);
combineLatest2<int, int, String>(c, two, (x, y) => "${x + y}").listen(
print,
onError: (e, s) {
print(e);
print(s);
},
onDone: () => print("c Done!"),
);
final Stream<int?> d = Stream.value(null);
combineLatest2<int?, int, String>(d, two, (x, y) => "${x ?? 0 + y}").listen(
print,
onError: (e, s) {
print(e);
print(s);
},
onDone: () => print("d Done!"),
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment