Skip to content

Instantly share code, notes, and snippets.

@a10y
Created March 25, 2024 02:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save a10y/7d40f3511a7ea37b10c31b7b2200a089 to your computer and use it in GitHub Desktop.
Save a10y/7d40f3511a7ea37b10c31b7b2200a089 to your computer and use it in GitHub Desktop.
Simple current-thread async executor in Rust
#![feature(noop_waker)]
#![feature(local_waker)]
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::fmt::Formatter;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::ptr::null_mut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{ContextBuilder, Poll, Waker};
fn main() {
// Create our own executor that can execute futures.
let executor = SimpleExecutor {
finished: Arc::new(Mutex::new(HashMap::new())),
runnable: Arc::new(Mutex::new(VecDeque::new())),
};
println!("SUBMIT");
let handle = executor.spawn(async move {
// What to do in here
println!("this is my future");
"success".to_string()
});
println!("{executor:?}");
println!("NEXT_TASK");
executor.next_task();
println!("{executor:?}");
println!("SHUTDOWN");
}
#[derive(Debug, Clone, Copy, Default, Hash, PartialEq, Eq)]
struct TaskId(u64);
struct Slot<T, F: Future<Output=T>> {
task_id: TaskId,
future: Pin<Box<F>>,
data: *mut T,
}
// How can we keep track of a simple executor that knows how to store the futures.
// But what if we don't know the name of the future ahead of time?
// There are some types we can't specifically name, is that part of the typing?
struct SimpleExecutor<T, F: Future<Output=T>> {
runnable: Arc<Mutex<VecDeque<Slot<T, F>>>>,
finished: Arc<Mutex<HashMap<TaskId, Slot<T, F>>>>,
}
impl<T, F: Future<Output=T>> fmt::Debug for SimpleExecutor<T, F> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("SimpleExecutor")
.field("runnable_count", &self.runnable.lock().unwrap().len())
.field("finished_count", &self.finished.lock().unwrap().len())
.finish()
}
}
static TASK_IDS: AtomicU64 = AtomicU64::new(1);
impl<T, F: Future<Output=T>> SimpleExecutor<T, F> {
pub fn new() -> Self {
Self {
runnable: Arc::new(Mutex::new(VecDeque::new())),
finished: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn next_task(&self) {
// loop {
println!("task loop");
let mut runnable = self.runnable.lock().unwrap();
if let Some(mut task) = runnable.pop_front() {
println!("next task found");
let fut = task.future.as_mut();
// let ctx: Context::from_waker(&mut Waker::noop());
let mut cx = ContextBuilder::from_waker(&Waker::noop()).build();
match fut.poll(&mut cx) {
Poll::Ready(result) => {
unsafe {
if !task.data.is_null() {
panic!("something went terribly wrong: future is completing twice");
}
// Box the result and save into a pointer
let mut result = Box::new(result);
task.data = result.as_mut();
}
self.finished.lock().unwrap().insert(
task.task_id,
task,
);
}
Poll::Pending => {
return;
// continue
}
}
}
// }
}
pub fn spawn(&self, fut: F) -> Handle<T>
{
let fut = Box::pin(fut);
let task_id = TaskId(TASK_IDS.fetch_add(1, Ordering::AcqRel));
let slot = Slot {
task_id,
data: null_mut(),
future: fut,
};
// Store the slot inside ourselves
let mut locked = self.runnable.lock().unwrap();
locked.push_back(
slot,
);
Handle {
task_id,
__phantom: PhantomData,
}
}
}
// Add in a join handle.
// We can have a single-threaded executor that just gives you access to the single executor.
struct Handle<T> {
// keep a pointer to the executor which spawned everything here instead.
task_id: TaskId,
__phantom: PhantomData<T>,
}
impl<T> Handle<T> {
pub fn join(self) -> T {
println!("not implemented yet");
todo!()
}
}
@a10y
Copy link
Author

a10y commented Mar 25, 2024

Execution result:

SUBMIT
SimpleExecutor { runnable_count: 1, finished_count: 0 }
NEXT_TASK
task loop
next task found
this is my future
SimpleExecutor { runnable_count: 0, finished_count: 1 }
SHUTDOWN

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment