Last active
December 28, 2023 14:27
-
-
Save PlugFox/d8e7a422f516d74b406062de5b679fef to your computer and use it in GitHub Desktop.
Queue EventQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'dart:collection'; | |
import 'dart:developer' as developer; | |
/// Async callback | |
typedef EventCallback = FutureOr<void> Function(); | |
/// {@template event_queue} | |
/// An event queue is a queue of [EventCallback]s that are executed in order. | |
/// {@endtemplate} | |
class EventQueue implements Sink<EventCallback> { | |
/// {@macro event_queue} | |
EventQueue({String debugLabel = 'EventQueue'}) : _debugLabel = debugLabel; | |
final Queue<_EventQueueTask> _queue = Queue<_EventQueueTask>(); | |
final String _debugLabel; | |
Future<void>? _processing; | |
bool get isClosed => _closed; | |
bool _closed = false; | |
@override | |
Future<void> add(EventCallback event) { | |
if (_closed) { | |
throw StateError('EventQueue is closed'); | |
} | |
final task = _EventQueueTask(event); | |
_queue.add(task); | |
_start(); | |
developer.Timeline.instantSync('$_debugLabel:add'); | |
return task.future; | |
} | |
@override | |
Future<void> close({bool force = false}) async { | |
_closed = true; | |
if (force) { | |
for (final task in _queue) { | |
task.reject( | |
StateError('OctopusStateQueue is closed'), | |
StackTrace.current, | |
); | |
} | |
_queue.clear(); | |
} else { | |
await _processing; | |
} | |
} | |
Future<void> _start() { | |
final processing = _processing; | |
if (processing != null) { | |
return processing; | |
} | |
final flow = developer.Flow.begin(); | |
developer.Timeline.instantSync('$_debugLabel:begin'); | |
return _processing = Future.doWhile(() async { | |
if (_queue.isEmpty) { | |
_processing = null; | |
developer.Timeline.instantSync('$_debugLabel:end'); | |
developer.Flow.end(flow.id); | |
return false; | |
} | |
try { | |
await developer.Timeline.timeSync( | |
'$_debugLabel:task', | |
_queue.removeFirst(), | |
flow: developer.Flow.step(flow.id), | |
); | |
} on Object catch (error, stackTrace) { | |
//warning(error, stackTrace, '$_debugLabel:exception'); | |
} | |
return true; | |
}); | |
} | |
} | |
class _EventQueueTask { | |
_EventQueueTask(EventCallback event) | |
: _fn = event, | |
_completer = Completer<void>(); | |
final EventCallback _fn; | |
final Completer<void> _completer; | |
Future<void> get future => _completer.future; | |
Future<void> call() async { | |
try { | |
if (_completer.isCompleted) return; | |
await _fn(); | |
if (_completer.isCompleted) return; | |
_completer.complete(); | |
} on Object catch (error, stackTrace) { | |
_completer.completeError(error, stackTrace); | |
} | |
} | |
/// {@nodoc} | |
void reject(Object error, [StackTrace? stackTrace]) { | |
if (_completer.isCompleted) return; // coverage:ignore-line | |
_completer.completeError(error, stackTrace); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment