Last active
April 22, 2023 23:23
-
-
Save eventhelix/1382dfedaabc40e6681c091cf950c453 to your computer and use it in GitHub Desktop.
An executor and test code that can be tested in the Rust Playground. Derived from: https://github.com/enlightware/simple-async-local-executor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![warn(missing_docs)] | |
//! Single-threaded polling-based executor suitable for use in games, embedded systems or WASM. | |
//! | |
//! This crate provides an executor to run async functions in single-threaded | |
//! environment with deterministic execution. To do so, the executor provides a [`Executor::step()`] | |
//! method that polls all non-blocked tasks exactly once. The executor also provides events | |
//! that can be waited upon. These events are referred to by the [`EventHandle`] type which is | |
//! instantiated by calling [`Executor::create_event_handle()`], and can be waited on by | |
//! creating [`EventFuture`] by calling the [`Executor::event()`] method. They can be activated | |
//! by calling the [`Executor::notify_event()`] method. | |
//! | |
//! # Example | |
//! ``` | |
//! # use simple_async_local_executor::*; | |
//! let executor = Executor::default(); | |
//! let events = [executor.create_event_handle(), executor.create_event_handle()]; | |
//! | |
//! async fn wait_events(events: [EventHandle; 2], executor: Executor) { | |
//! executor.event(&events[0]).await; | |
//! executor.event(&events[1]).await; | |
//! } | |
//! | |
//! executor.spawn(wait_events(events.clone(), executor.clone())); | |
//! assert_eq!(executor.step(), true); | |
//! assert_eq!(executor.step(), true); | |
//! executor.notify_event(&events[0]); | |
//! assert_eq!(executor.step(), true); | |
//! executor.notify_event(&events[1]); | |
//! assert_eq!(executor.step(), false); | |
//! ``` | |
use core::fmt; | |
use std::{ | |
cell::{Cell, RefCell}, | |
future::Future, | |
hash::{Hash, Hasher}, | |
pin::Pin, | |
ptr, | |
rc::Rc, | |
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, | |
}; | |
#[cfg(feature = "futures")] | |
use futures::future::FusedFuture; | |
use slab::Slab; | |
// Useful reading for people interested in writing executors: | |
// - https://os.phil-opp.com/async-await/ | |
// - https://rust-lang.github.io/async-book/02_execution/01_chapter.html | |
// - https://github.com/rust-lang/rfcs/blob/master/text/2592-futures.md#rationale-drawbacks-and-alternatives-to-the-wakeup-design-waker | |
fn dummy_raw_waker() -> RawWaker { | |
fn no_op(_: *const ()) {} | |
fn clone(_: *const ()) -> RawWaker { | |
dummy_raw_waker() | |
} | |
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op); | |
RawWaker::new(std::ptr::null::<()>(), vtable) | |
} | |
fn dummy_waker() -> Waker { | |
unsafe { Waker::from_raw(dummy_raw_waker()) } | |
} | |
#[derive(Clone)] | |
struct EventHandleInner { | |
index: usize, | |
executor: Rc<ExecutorInner>, | |
} | |
impl fmt::Debug for EventHandleInner { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
self.index.fmt(f) | |
} | |
} | |
impl Eq for EventHandleInner {} | |
impl PartialEq for EventHandleInner { | |
fn eq(&self, other: &Self) -> bool { | |
self.index == other.index && ptr::eq(self.executor.as_ref(), other.executor.as_ref()) | |
} | |
} | |
impl Hash for EventHandleInner { | |
fn hash<H: Hasher>(&self, state: &mut H) { | |
self.index.hash(state); | |
(self.executor.as_ref() as *const ExecutorInner).hash(state); | |
} | |
} | |
impl Drop for EventHandleInner { | |
fn drop(&mut self) { | |
self.executor.release_event_handle(self); | |
} | |
} | |
/// A handle for an event, can be kept and cloned around | |
#[derive(Clone, Debug, PartialEq, Eq, Hash)] | |
pub struct EventHandle(Rc<EventHandleInner>); | |
type SharedBool = Rc<Cell<bool>>; | |
/// A future to await an event | |
pub struct EventFuture { | |
ready: SharedBool, | |
_handle: EventHandle, | |
done: bool, | |
} | |
impl Future for EventFuture { | |
type Output = (); | |
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { | |
if self.ready.get() { | |
self.done = true; | |
Poll::Ready(()) | |
} else { | |
Poll::Pending | |
} | |
} | |
} | |
#[cfg(feature = "futures")] | |
impl FusedFuture for EventFuture { | |
fn is_terminated(&self) -> bool { | |
self.done | |
} | |
} | |
struct Task { | |
future: Pin<Box<dyn Future<Output = ()>>>, | |
} | |
impl Task { | |
pub fn new(future: impl Future<Output = ()> + 'static) -> Task { | |
Task { | |
future: Box::pin(future), | |
} | |
} | |
fn poll(&mut self, context: &mut Context) -> Poll<()> { | |
self.future.as_mut().poll(context) | |
} | |
} | |
#[derive(Default)] | |
struct ExecutorInner { | |
task_queue: RefCell<Vec<Task>>, | |
new_tasks: RefCell<Vec<Task>>, | |
events: RefCell<Slab<SharedBool>>, | |
} | |
impl ExecutorInner { | |
fn release_event_handle(&self, event: &EventHandleInner) { | |
self.events.borrow_mut().remove(event.index); | |
} | |
} | |
/// Single-threaded polling-based executor | |
/// | |
/// This is a thin-wrapper (using [`Rc`]) around the real executor, so that this struct can be | |
/// cloned and passed around. | |
/// | |
/// See the [module documentation] for more details. | |
/// | |
/// [module documentation]: index.html | |
#[derive(Clone, Default)] | |
pub struct Executor { | |
inner: Rc<ExecutorInner>, | |
} | |
impl Executor { | |
/// Spawn a new task to be run by this executor. | |
/// | |
/// # Example | |
/// ``` | |
/// # use simple_async_local_executor::*; | |
/// async fn nop() {} | |
/// let executor = Executor::default(); | |
/// executor.spawn(nop()); | |
/// assert_eq!(executor.step(), false); | |
/// ``` | |
pub fn spawn(&self, future: impl Future<Output = ()> + 'static) { | |
self.inner.new_tasks.borrow_mut().push(Task::new(future)); | |
} | |
/// Create an event handle, that can be used to [await](Executor::event()) and [notify](Executor::notify_event()) an event. | |
pub fn create_event_handle(&self) -> EventHandle { | |
let mut events = self.inner.events.borrow_mut(); | |
let index = events.insert(Rc::new(Cell::new(false))); | |
EventHandle(Rc::new(EventHandleInner { | |
index, | |
executor: self.inner.clone(), | |
})) | |
} | |
/// Notify an event. | |
/// | |
/// All tasks currently waiting on this event will | |
/// progress at the next call to [`step`](Executor::step()). | |
pub fn notify_event(&self, handle: &EventHandle) { | |
self.inner.events.borrow_mut()[handle.0.index].replace(true); | |
} | |
/// Create an event future. | |
/// | |
/// Once this future is awaited, its task will be blocked until the next [`step`](Executor::step()) | |
/// after [`notify_event`](Executor::notify_event()) is called with this `handle`. | |
pub fn event(&self, handle: &EventHandle) -> EventFuture { | |
let ready = self.inner.events.borrow_mut()[handle.0.index].clone(); | |
EventFuture { | |
ready, | |
_handle: handle.clone(), | |
done: false, | |
} | |
} | |
/// Run each non-blocked task exactly once. | |
/// | |
/// Return whether there are any non-completed tasks. | |
/// | |
/// # Example | |
/// ``` | |
/// # use simple_async_local_executor::*; | |
/// let executor = Executor::default(); | |
/// let event = executor.create_event_handle(); | |
/// async fn wait_event(event: EventHandle, executor: Executor) { | |
/// executor.event(&event).await; | |
/// } | |
/// executor.spawn(wait_event(event.clone(), executor.clone())); | |
/// assert_eq!(executor.step(), true); // still one task in the queue | |
/// executor.notify_event(&event); | |
/// assert_eq!(executor.step(), false); // no more task in the queue | |
/// ``` | |
pub fn step(&self) -> bool { | |
// Dummy waker and context (not used as we poll all tasks) | |
let waker = dummy_waker(); | |
let mut context = Context::from_waker(&waker); | |
// Append new tasks created since the last step into the task queue | |
let mut task_queue = self.inner.task_queue.borrow_mut(); | |
task_queue.append(&mut self.inner.new_tasks.borrow_mut()); | |
// Loop over all tasks, polling them. If a task is not ready, add it to the pending tasks. | |
let mut pending_tasks = Vec::new(); | |
let mut any_left = false; | |
for mut task in task_queue.drain(..) { | |
match task.poll(&mut context) { | |
Poll::Ready(()) => {} // task done | |
Poll::Pending => { | |
pending_tasks.push(task); | |
any_left = true; | |
} | |
} | |
} | |
// Keep pending tasks for the next step | |
*task_queue = pending_tasks; | |
// clear events | |
for (_, event) in self.inner.events.borrow_mut().iter_mut() { | |
event.replace(false); | |
} | |
any_left | |
} | |
} | |
#[derive(Default)] | |
struct Unit { | |
/// The 1-D position of the unit. In a real game, it would be a 2D or 3D. | |
pub pos: i32, | |
} | |
type UnitRef = Rc<RefCell<Unit>>; | |
/// A future that will move the unit towards `target_pos` at each step, | |
/// and complete when the unit has reached that position. | |
struct UnitGotoFuture { | |
unit: UnitRef, | |
target_pos: i32, | |
} | |
impl Future for UnitGotoFuture { | |
type Output = (); | |
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { | |
let unit_pos = self.unit.borrow().pos; | |
if unit_pos == self.target_pos { | |
Poll::Ready(()) | |
} else { | |
self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum(); | |
Poll::Pending | |
} | |
} | |
} | |
/// Helper async function to write unit behavior nicely | |
async fn goto(unit: UnitRef, pos: i32) { | |
UnitGotoFuture { | |
unit, | |
target_pos: pos, | |
} | |
.await; | |
} | |
/// Let a unit go back and forth between two positions | |
async fn patrol(unit: UnitRef, poses: [i32; 2]) { | |
loop { | |
goto(unit.clone(), poses[0]).await; | |
goto(unit.clone(), poses[1]).await; | |
} | |
} | |
/// Test program with two units: one patrolling and one going to a position. | |
pub fn main() { | |
let executor = Executor::default(); | |
let units: [UnitRef; 2] = Default::default(); | |
executor.spawn(patrol(units[0].clone(), [-5, 5])); | |
// executor.spawn(patrol(units[1].clone(), [-1,1])); | |
let print_poses = || { | |
println!( | |
"Unit poses: {}", | |
units | |
.iter() | |
.map(|unit| unit.borrow().pos.to_string()) | |
.collect::<Vec<_>>() | |
.join(", ") | |
); | |
}; | |
print_poses(); | |
for _ in 0..30 { | |
executor.step(); | |
print_poses(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment