Created
August 21, 2018 01:21
-
-
Save robert-snakard/34b32e8df85a1c8607c53a1de44c91b8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)] | |
use std::future::{Future, FutureObj}; | |
use std::mem::PinMut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver}; | |
use std::task::{ | |
self, | |
local_waker_from_nonlocal, | |
Context, | |
Poll, | |
Spawn, | |
SpawnErrorKind, | |
SpawnObjError, | |
Wake, | |
}; | |
mod secret; | |
use self::secret::almost_ready; | |
/// Task executor that receives tasks off of a channel and runs them. | |
struct Executor { | |
task_receiver: Receiver<Arc<Task>>, | |
} | |
impl Executor { | |
fn run(&self) { | |
// FIXME: implement the running of the executor. | |
// | |
// This method should pull tasks off of the existing task | |
// queue and run them to completion. | |
// | |
// In order to poll futures, you'll need to construct a | |
// `task::Context` from a `LocalWaker` and a `Spawn`. | |
// You can get a value of type `LocalWaker` by calling | |
// `local_waker_from_nonlocal` on an `Arc<W>` where `W: Wake`. | |
// | |
// To poll the future you'll need to do: | |
// `PinMut::new(future).poll(cx)` where cx is `&mut Context` | |
// | |
while let Ok(cur_task) = self.task_receiver.recv().unwrap() { | |
let mut local_spawner = &cur_task.spawner; | |
let local_waker = local_waker_from_nonlocal(cur_task.clone()); | |
let mut cx = Context::new(&local_waker, &mut local_spawner); | |
let mut future = cur_task.future.lock().unwrap().take(); | |
if let None = future { | |
return; | |
} | |
match PinMut::new(&mut future.unwrap()).poll(&mut cx) { | |
Poll::Ready(ret) => println!("ready future {:?}", ret), | |
Poll::Pending => { | |
println!("pending future"); | |
local_waker.wake(); | |
}, | |
} | |
} | |
} | |
} | |
/// Task executor that spawns tasks onto a channel. | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
impl Spawner { | |
// Spawn a future as a new top-level task. | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
let future_obj = FutureObj::new(Box::new(future)); | |
(&mut &*self).spawn_obj(future_obj) | |
.expect("unable to spawn"); | |
} | |
} | |
// Implement the `Spawn` trait for `&Spawner` rather than `Spawner` since | |
// we don't require a mutable reference to `Spawner`. | |
impl<'a> Spawn for &'a Spawner { | |
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) | |
-> Result<(), SpawnObjError> | |
{ | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
spawner: self.clone(), | |
}); | |
self.task_sender.send(task).map_err(|SendError(task)| { | |
SpawnObjError { | |
kind: SpawnErrorKind::shutdown(), | |
future: task.future.lock().unwrap().take().unwrap(), | |
} | |
}) | |
} | |
} | |
struct Task { | |
// In-progress future that should be pushed to completion | |
future: Mutex<Option<FutureObj<'static, ()>>>, | |
// Handle to spawn tasks onto the task queue | |
spawner: Spawner, | |
} | |
impl Wake for Task { | |
fn wake(arc_self: &Arc<Self>) { | |
// FIXME: implement `Wake` by putting the task back onto the task queue | |
arc_self.spawner.task_sender.send(arc_self.clone()).unwrap(); | |
} | |
} | |
fn new_executor_and_spawner() -> (Executor, Spawner) { | |
// Maximum number of tasks to allow queueing in the channel at once. | |
// This is just to make `sync_channel` happy, and wouldn't be present in | |
// a real executor. | |
const MAX_QUEUED_TASKS: usize = 10000; | |
let (task_sender, task_receiver) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { task_receiver }, Spawner { task_sender }) | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spawner(); | |
spawner.spawn(async { | |
println!("howdy!"); | |
let x = await!(almost_ready(5)); | |
println!("done: {:?}", x); | |
}); | |
executor.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment