Skip to content

Instantly share code, notes, and snippets.

@eventhelix
Last active April 22, 2023 23:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eventhelix/1382dfedaabc40e6681c091cf950c453 to your computer and use it in GitHub Desktop.
Save eventhelix/1382dfedaabc40e6681c091cf950c453 to your computer and use it in GitHub Desktop.
An executor and test code that can be tested in the Rust Playground. Derived from: https://github.com/enlightware/simple-async-local-executor.
#![warn(missing_docs)]
//! Single-threaded polling-based executor suitable for use in games, embedded systems or WASM.
//!
//! This crate provides an executor to run async functions in single-threaded
//! environment with deterministic execution. To do so, the executor provides a [`Executor::step()`]
//! method that polls all non-blocked tasks exactly once. The executor also provides events
//! that can be waited upon. These events are referred to by the [`EventHandle`] type which is
//! instantiated by calling [`Executor::create_event_handle()`], and can be waited on by
//! creating [`EventFuture`] by calling the [`Executor::event()`] method. They can be activated
//! by calling the [`Executor::notify_event()`] method.
//!
//! # Example
//! ```
//! # use simple_async_local_executor::*;
//! let executor = Executor::default();
//! let events = [executor.create_event_handle(), executor.create_event_handle()];
//!
//! async fn wait_events(events: [EventHandle; 2], executor: Executor) {
//! executor.event(&events[0]).await;
//! executor.event(&events[1]).await;
//! }
//!
//! executor.spawn(wait_events(events.clone(), executor.clone()));
//! assert_eq!(executor.step(), true);
//! assert_eq!(executor.step(), true);
//! executor.notify_event(&events[0]);
//! assert_eq!(executor.step(), true);
//! executor.notify_event(&events[1]);
//! assert_eq!(executor.step(), false);
//! ```
use core::fmt;
use std::{
cell::{Cell, RefCell},
future::Future,
hash::{Hash, Hasher},
pin::Pin,
ptr,
rc::Rc,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
#[cfg(feature = "futures")]
use futures::future::FusedFuture;
use slab::Slab;
// Useful reading for people interested in writing executors:
// - https://os.phil-opp.com/async-await/
// - https://rust-lang.github.io/async-book/02_execution/01_chapter.html
// - https://github.com/rust-lang/rfcs/blob/master/text/2592-futures.md#rationale-drawbacks-and-alternatives-to-the-wakeup-design-waker
fn dummy_raw_waker() -> RawWaker {
fn no_op(_: *const ()) {}
fn clone(_: *const ()) -> RawWaker {
dummy_raw_waker()
}
let vtable = &RawWakerVTable::new(clone, no_op, no_op, no_op);
RawWaker::new(std::ptr::null::<()>(), vtable)
}
fn dummy_waker() -> Waker {
unsafe { Waker::from_raw(dummy_raw_waker()) }
}
#[derive(Clone)]
struct EventHandleInner {
index: usize,
executor: Rc<ExecutorInner>,
}
impl fmt::Debug for EventHandleInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.index.fmt(f)
}
}
impl Eq for EventHandleInner {}
impl PartialEq for EventHandleInner {
fn eq(&self, other: &Self) -> bool {
self.index == other.index && ptr::eq(self.executor.as_ref(), other.executor.as_ref())
}
}
impl Hash for EventHandleInner {
fn hash<H: Hasher>(&self, state: &mut H) {
self.index.hash(state);
(self.executor.as_ref() as *const ExecutorInner).hash(state);
}
}
impl Drop for EventHandleInner {
fn drop(&mut self) {
self.executor.release_event_handle(self);
}
}
/// A handle for an event, can be kept and cloned around
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventHandle(Rc<EventHandleInner>);
type SharedBool = Rc<Cell<bool>>;
/// A future to await an event
pub struct EventFuture {
ready: SharedBool,
_handle: EventHandle,
done: bool,
}
impl Future for EventFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
if self.ready.get() {
self.done = true;
Poll::Ready(())
} else {
Poll::Pending
}
}
}
#[cfg(feature = "futures")]
impl FusedFuture for EventFuture {
fn is_terminated(&self) -> bool {
self.done
}
}
struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
}
impl Task {
pub fn new(future: impl Future<Output = ()> + 'static) -> Task {
Task {
future: Box::pin(future),
}
}
fn poll(&mut self, context: &mut Context) -> Poll<()> {
self.future.as_mut().poll(context)
}
}
#[derive(Default)]
struct ExecutorInner {
task_queue: RefCell<Vec<Task>>,
new_tasks: RefCell<Vec<Task>>,
events: RefCell<Slab<SharedBool>>,
}
impl ExecutorInner {
fn release_event_handle(&self, event: &EventHandleInner) {
self.events.borrow_mut().remove(event.index);
}
}
/// Single-threaded polling-based executor
///
/// This is a thin-wrapper (using [`Rc`]) around the real executor, so that this struct can be
/// cloned and passed around.
///
/// See the [module documentation] for more details.
///
/// [module documentation]: index.html
#[derive(Clone, Default)]
pub struct Executor {
inner: Rc<ExecutorInner>,
}
impl Executor {
/// Spawn a new task to be run by this executor.
///
/// # Example
/// ```
/// # use simple_async_local_executor::*;
/// async fn nop() {}
/// let executor = Executor::default();
/// executor.spawn(nop());
/// assert_eq!(executor.step(), false);
/// ```
pub fn spawn(&self, future: impl Future<Output = ()> + 'static) {
self.inner.new_tasks.borrow_mut().push(Task::new(future));
}
/// Create an event handle, that can be used to [await](Executor::event()) and [notify](Executor::notify_event()) an event.
pub fn create_event_handle(&self) -> EventHandle {
let mut events = self.inner.events.borrow_mut();
let index = events.insert(Rc::new(Cell::new(false)));
EventHandle(Rc::new(EventHandleInner {
index,
executor: self.inner.clone(),
}))
}
/// Notify an event.
///
/// All tasks currently waiting on this event will
/// progress at the next call to [`step`](Executor::step()).
pub fn notify_event(&self, handle: &EventHandle) {
self.inner.events.borrow_mut()[handle.0.index].replace(true);
}
/// Create an event future.
///
/// Once this future is awaited, its task will be blocked until the next [`step`](Executor::step())
/// after [`notify_event`](Executor::notify_event()) is called with this `handle`.
pub fn event(&self, handle: &EventHandle) -> EventFuture {
let ready = self.inner.events.borrow_mut()[handle.0.index].clone();
EventFuture {
ready,
_handle: handle.clone(),
done: false,
}
}
/// Run each non-blocked task exactly once.
///
/// Return whether there are any non-completed tasks.
///
/// # Example
/// ```
/// # use simple_async_local_executor::*;
/// let executor = Executor::default();
/// let event = executor.create_event_handle();
/// async fn wait_event(event: EventHandle, executor: Executor) {
/// executor.event(&event).await;
/// }
/// executor.spawn(wait_event(event.clone(), executor.clone()));
/// assert_eq!(executor.step(), true); // still one task in the queue
/// executor.notify_event(&event);
/// assert_eq!(executor.step(), false); // no more task in the queue
/// ```
pub fn step(&self) -> bool {
// Dummy waker and context (not used as we poll all tasks)
let waker = dummy_waker();
let mut context = Context::from_waker(&waker);
// Append new tasks created since the last step into the task queue
let mut task_queue = self.inner.task_queue.borrow_mut();
task_queue.append(&mut self.inner.new_tasks.borrow_mut());
// Loop over all tasks, polling them. If a task is not ready, add it to the pending tasks.
let mut pending_tasks = Vec::new();
let mut any_left = false;
for mut task in task_queue.drain(..) {
match task.poll(&mut context) {
Poll::Ready(()) => {} // task done
Poll::Pending => {
pending_tasks.push(task);
any_left = true;
}
}
}
// Keep pending tasks for the next step
*task_queue = pending_tasks;
// clear events
for (_, event) in self.inner.events.borrow_mut().iter_mut() {
event.replace(false);
}
any_left
}
}
#[derive(Default)]
struct Unit {
/// The 1-D position of the unit. In a real game, it would be a 2D or 3D.
pub pos: i32,
}
type UnitRef = Rc<RefCell<Unit>>;
/// A future that will move the unit towards `target_pos` at each step,
/// and complete when the unit has reached that position.
struct UnitGotoFuture {
unit: UnitRef,
target_pos: i32,
}
impl Future for UnitGotoFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let unit_pos = self.unit.borrow().pos;
if unit_pos == self.target_pos {
Poll::Ready(())
} else {
self.unit.borrow_mut().pos += (self.target_pos - unit_pos).signum();
Poll::Pending
}
}
}
/// Helper async function to write unit behavior nicely
async fn goto(unit: UnitRef, pos: i32) {
UnitGotoFuture {
unit,
target_pos: pos,
}
.await;
}
/// Let a unit go back and forth between two positions
async fn patrol(unit: UnitRef, poses: [i32; 2]) {
loop {
goto(unit.clone(), poses[0]).await;
goto(unit.clone(), poses[1]).await;
}
}
/// Test program with two units: one patrolling and one going to a position.
pub fn main() {
let executor = Executor::default();
let units: [UnitRef; 2] = Default::default();
executor.spawn(patrol(units[0].clone(), [-5, 5]));
// executor.spawn(patrol(units[1].clone(), [-1,1]));
let print_poses = || {
println!(
"Unit poses: {}",
units
.iter()
.map(|unit| unit.borrow().pos.to_string())
.collect::<Vec<_>>()
.join(", ")
);
};
print_poses();
for _ in 0..30 {
executor.step();
print_poses();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment