Skip to content

Instantly share code, notes, and snippets.

@PlugFox
Last active December 28, 2023 14:27
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PlugFox/d8e7a422f516d74b406062de5b679fef to your computer and use it in GitHub Desktop.
Save PlugFox/d8e7a422f516d74b406062de5b679fef to your computer and use it in GitHub Desktop.
Queue EventQueue
import 'dart:async';
import 'dart:collection';
import 'dart:developer' as developer;
/// Async callback
typedef EventCallback = FutureOr<void> Function();
/// {@template event_queue}
/// An event queue is a queue of [EventCallback]s that are executed in order.
/// {@endtemplate}
class EventQueue implements Sink<EventCallback> {
/// {@macro event_queue}
EventQueue({String debugLabel = 'EventQueue'}) : _debugLabel = debugLabel;
final Queue<_EventQueueTask> _queue = Queue<_EventQueueTask>();
final String _debugLabel;
Future<void>? _processing;
bool get isClosed => _closed;
bool _closed = false;
@override
Future<void> add(EventCallback event) {
if (_closed) {
throw StateError('EventQueue is closed');
}
final task = _EventQueueTask(event);
_queue.add(task);
_start();
developer.Timeline.instantSync('$_debugLabel:add');
return task.future;
}
@override
Future<void> close({bool force = false}) async {
_closed = true;
if (force) {
for (final task in _queue) {
task.reject(
StateError('OctopusStateQueue is closed'),
StackTrace.current,
);
}
_queue.clear();
} else {
await _processing;
}
}
Future<void> _start() {
final processing = _processing;
if (processing != null) {
return processing;
}
final flow = developer.Flow.begin();
developer.Timeline.instantSync('$_debugLabel:begin');
return _processing = Future.doWhile(() async {
if (_queue.isEmpty) {
_processing = null;
developer.Timeline.instantSync('$_debugLabel:end');
developer.Flow.end(flow.id);
return false;
}
try {
await developer.Timeline.timeSync(
'$_debugLabel:task',
_queue.removeFirst(),
flow: developer.Flow.step(flow.id),
);
} on Object catch (error, stackTrace) {
//warning(error, stackTrace, '$_debugLabel:exception');
}
return true;
});
}
}
class _EventQueueTask {
_EventQueueTask(EventCallback event)
: _fn = event,
_completer = Completer<void>();
final EventCallback _fn;
final Completer<void> _completer;
Future<void> get future => _completer.future;
Future<void> call() async {
try {
if (_completer.isCompleted) return;
await _fn();
if (_completer.isCompleted) return;
_completer.complete();
} on Object catch (error, stackTrace) {
_completer.completeError(error, stackTrace);
}
}
/// {@nodoc}
void reject(Object error, [StackTrace? stackTrace]) {
if (_completer.isCompleted) return; // coverage:ignore-line
_completer.completeError(error, stackTrace);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment