Skip to content

Instantly share code, notes, and snippets.

@gladimdim
Created February 9, 2022 12:09
Show Gist options
  • Save gladimdim/4c46555f62e6c5b76f41561a97a35849 to your computer and use it in GitHub Desktop.
Save gladimdim/4c46555f62e6c5b76f41561a97a35849 to your computer and use it in GitHub Desktop.
Isolates emiting numbers and another stream sums them
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';
import 'package:rxdart/rxdart.dart';
void main() async {
await sumTwoLists();
print("done");
}
///
/// Create two isolates and passes to them a list of integers
/// Then it listens to the Isolate streams and prints the sum of numbers from first and second lists
/// Once both lists are finished the function finishes.
///
Future sumTwoLists() async {
final p = ReceivePort();
final e = ReceivePort();
await Isolate.spawn(_emitNumber, [
p.sendPort,
[0, 1, 2, 3]
]);
await Isolate.spawn(_emitNumber, [
e.sendPort,
[10, 11, 12, 13]
]);
await sumListsInIsolates(p, e);
}
Future sumListsInIsolates(ReceivePort a, ReceivePort b) {
// create a promise so we can notify caller that both streams are empty and we
// summed all the numbers in both lists.
var completer = Completer();
// zipper with combine numbers from both streams into a List with two values
// It will combine only values with the same index in stream
// null means the Isolate was killed
a.zipWith(b, (f, s) {
if (f == null || s == null) {
// !!! don't forget to close the ports as the application will never Exit with Code 0 !!!
// otherwise it will be forever running
a.close();
b.close();
completer.complete(null);
return;
}
return f + s;
}).listen(print);
// return defer object
return completer.future;
}
// emits given numbers one by one each second. Isolate is exit when no numbers left
void _emitNumber(List<dynamic> args) {
SendPort p = args[0];
List<int> numbers = args[1];
// use queue to easier empty the list
var queue = Queue.from(numbers);
var stream = Stream.periodic(const Duration(seconds: 1), (count) {
if (queue.isNotEmpty) {
return queue.removeFirst();
} else {
Isolate.exit(p);
}
});
stream.listen((event) {
p.send(event);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment