Created
August 22, 2018 02:18
-
-
Save robert-snakard/60e7c13480fd39174deaee3648553f6b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)] | |
use std::future::{Future, FutureObj}; | |
use std::mem::PinMut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver}; | |
use std::task::{ | |
self, | |
local_waker_from_nonlocal, | |
Context, | |
Poll, | |
Spawn, | |
SpawnErrorKind, | |
SpawnObjError, | |
Wake, | |
}; | |
mod secret; | |
use self::secret::almost_ready; | |
/// Task executor that receives tasks off of a channel and runs them. | |
struct Executor { | |
task_receiver: Receiver<Arc<Task>>, | |
} | |
#![feature(async_await, await_macro, futures_api, pin, arbitrary_self_types)] | |
use std::future::{Future, FutureObj}; | |
use std::mem::PinMut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver}; | |
use std::task::{ | |
self, | |
local_waker_from_nonlocal, | |
Context, | |
Poll, | |
Spawn, | |
SpawnErrorKind, | |
SpawnObjError, | |
Wake, | |
}; | |
mod secret; | |
use self::secret::almost_ready; | |
/// Task executor that receives tasks off of a channel and runs them. | |
struct Executor { | |
task_receiver: Receiver<Arc<Task>>, | |
} | |
impl Executor { | |
fn run(&self) { | |
// FIXME: implement the running of the executor. | |
// | |
// This method should pull tasks off of the existing task | |
// queue and run them to completion. | |
// | |
// In order to poll futures, you'll need to construct a | |
// `task::Context` from a `LocalWaker` and a `Spawn`. | |
// You can get a value of type `LocalWaker` by calling | |
// `local_waker_from_nonlocal` on an `Arc<W>` where `W: Wake`. | |
// | |
// To poll the future you'll need to do: | |
// `PinMut::new(future).poll(cx)` where cx is `&mut Context` | |
// | |
while let Ok(cur_task) = self.task_receiver.recv() { | |
let mut future_box = cur_task.future.lock().unwrap(); | |
println!("{:?}", future_box); | |
if let Some(mut future) = future_box.take() { | |
println!("SOME HERE"); | |
let local_waker = local_waker_from_nonlocal(cur_task.clone()); | |
let mut local_spawner = &cur_task.spawner; | |
let mut cx = Context::new(&local_waker, &mut local_spawner); | |
match PinMut::new(&mut future).poll(&mut cx) { | |
Poll::Ready(ret) => { | |
println!("ready future {:?}", ret); | |
local_waker.wake(); | |
}, | |
Poll::Pending => { | |
println!("pending future"); | |
*future_box = Some(future); | |
}, | |
} | |
} | |
} | |
} | |
} | |
/// Task executor that spawns tasks onto a channel. | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
impl Spawner { | |
// Spawn a future as a new top-level task. | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
let future_obj = FutureObj::new(Box::new(future)); | |
(&mut &*self).spawn_obj(future_obj) | |
.expect("unable to spawn"); | |
} | |
} | |
// Implement the `Spawn` trait for `&Spawner` rather than `Spawner` since | |
// we don't require a mutable reference to `Spawner`. | |
impl<'a> Spawn for &'a Spawner { | |
fn spawn_obj(&mut self, future: FutureObj<'static, ()>) | |
-> Result<(), SpawnObjError> | |
{ | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
spawner: self.clone(), | |
}); | |
self.task_sender.send(task).map_err(|SendError(task)| { | |
SpawnObjError { | |
kind: SpawnErrorKind::shutdown(), | |
future: task.future.lock().unwrap().take().unwrap(), | |
} | |
}) | |
} | |
} | |
impl Wake for Task { | |
fn wake(arc_self: &Arc<Self>) { | |
arc_self.spawner.task_sender.send(arc_self.clone()).unwrap(); | |
} | |
} | |
fn new_executor_and_spawner() -> (Executor, Spawner) { | |
// Maximum number of tasks to allow queueing in the channel at once. | |
// This is just to make `sync_channel` happy, and wouldn't be present in | |
// a real executor. | |
const MAX_QUEUED_TASKS: usize = 10000; | |
let (task_sender, task_receiver) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { task_receiver }, Spawner { task_sender }) | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spawner(); | |
spawner.spawn(async { | |
println!("howdy!"); | |
let x = await!(almost_ready(5)); | |
println!("done: {:?}", x); | |
}); | |
let child = std::thread::spawn(move || { | |
println!("TEST"); | |
executor.run(); | |
}); | |
std::mem::drop(spawner); | |
let _ = child.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment