Skip to content

Instantly share code, notes, and snippets.

@iota9star
Last active February 8, 2023 05:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iota9star/0a67951ba5652e7fd99f040f42a0131b to your computer and use it in GitHub Desktop.
Save iota9star/0a67951ba5652e7fd99f040f42a0131b to your computer and use it in GitHub Desktop.
Execute a specified number of tasks in parallel.
import 'dart:async';
import 'dart:collection';
import 'package:collection/collection.dart';
import 'package:tuple/tuple.dart';
typedef QueueTask<T> = Tuple3<int, FutureOr<T> Function(), Completer<T>>;
extension QueueTaskExtension<T> on QueueTask<T> {
int get taskId => item1;
FutureOr<T> Function() get block => item2;
Completer<T> get completer => item3;
void cancel([Object? error]) {
if (completer.isCompleted) {
throw StateError('The task($taskId) has ended');
}
completer.completeError(error ?? QueueTaskCancelledError());
}
}
class MutexQueue {
MutexQueue(
this.parallel, {
this.timeout = const Duration(seconds: 45),
});
final int parallel;
final Duration timeout;
final _tasks = Queue<QueueTask>();
int _running = 0;
int _id = 0;
Future<T> withLock<T>(FutureOr<T> Function() block) {
return enqueue(block).completer.future;
}
QueueTask<T> enqueue<T>(FutureOr<T> Function() block) {
final task = QueueTask(_id++, block, Completer<T>());
_tasks.add(task);
_runTask();
return task;
}
void _runTask() {
while (_tasks.isNotEmpty && _running < parallel) {
_running++;
final task = _tasks.removeFirst();
Future(task.block).timeout(timeout).then((value) {
task.completer.complete(value);
}).catchError((e, s) {
task.completer.completeError(e, s);
}).whenComplete(() {
_running--;
_runTask();
});
}
}
void cancelAll([Object? error]) {
while (_tasks.isNotEmpty) {
final task = _tasks.removeFirst();
task.cancel(error);
}
}
bool cancel(int id, [Object? error]) {
final task = _tasks.firstWhereOrNull((e) => e.taskId == id);
if (task == null) {
return false;
}
task.cancel(error);
return true;
}
}
class QueueTaskCancelledError extends Error {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment