Skip to content

Instantly share code, notes, and snippets.

@robert-snakard
Created August 21, 2018 01:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robert-snakard/34b32e8df85a1c8607c53a1de44c91b8 to your computer and use it in GitHub Desktop.
Save robert-snakard/34b32e8df85a1c8607c53a1de44c91b8 to your computer and use it in GitHub Desktop.
![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)]
use std::future::{Future, FutureObj};
use std::mem::PinMut;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver};
use std::task::{
self,
local_waker_from_nonlocal,
Context,
Poll,
Spawn,
SpawnErrorKind,
SpawnObjError,
Wake,
};
mod secret;
use self::secret::almost_ready;
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
task_receiver: Receiver<Arc<Task>>,
}
impl Executor {
fn run(&self) {
// FIXME: implement the running of the executor.
//
// This method should pull tasks off of the existing task
// queue and run them to completion.
//
// In order to poll futures, you'll need to construct a
// `task::Context` from a `LocalWaker` and a `Spawn`.
// You can get a value of type `LocalWaker` by calling
// `local_waker_from_nonlocal` on an `Arc<W>` where `W: Wake`.
//
// To poll the future you'll need to do:
// `PinMut::new(future).poll(cx)` where cx is `&mut Context`
//
while let Ok(cur_task) = self.task_receiver.recv().unwrap() {
let mut local_spawner = &cur_task.spawner;
let local_waker = local_waker_from_nonlocal(cur_task.clone());
let mut cx = Context::new(&local_waker, &mut local_spawner);
let mut future = cur_task.future.lock().unwrap().take();
if let None = future {
return;
}
match PinMut::new(&mut future.unwrap()).poll(&mut cx) {
Poll::Ready(ret) => println!("ready future {:?}", ret),
Poll::Pending => {
println!("pending future");
local_waker.wake();
},
}
}
}
}
/// Task executor that spawns tasks onto a channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
// Spawn a future as a new top-level task.
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future_obj = FutureObj::new(Box::new(future));
(&mut &*self).spawn_obj(future_obj)
.expect("unable to spawn");
}
}
// Implement the `Spawn` trait for `&Spawner` rather than `Spawner` since
// we don't require a mutable reference to `Spawner`.
impl<'a> Spawn for &'a Spawner {
fn spawn_obj(&mut self, future: FutureObj<'static, ()>)
-> Result<(), SpawnObjError>
{
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
spawner: self.clone(),
});
self.task_sender.send(task).map_err(|SendError(task)| {
SpawnObjError {
kind: SpawnErrorKind::shutdown(),
future: task.future.lock().unwrap().take().unwrap(),
}
})
}
}
struct Task {
// In-progress future that should be pushed to completion
future: Mutex<Option<FutureObj<'static, ()>>>,
// Handle to spawn tasks onto the task queue
spawner: Spawner,
}
impl Wake for Task {
fn wake(arc_self: &Arc<Self>) {
// FIXME: implement `Wake` by putting the task back onto the task queue
arc_self.spawner.task_sender.send(arc_self.clone()).unwrap();
}
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10000;
let (task_sender, task_receiver) = sync_channel(MAX_QUEUED_TASKS);
(Executor { task_receiver }, Spawner { task_sender })
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async {
println!("howdy!");
let x = await!(almost_ready(5));
println!("done: {:?}", x);
});
executor.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment