Created
May 20, 2020 02:56
-
-
Save ear7h/143282aa8451d1b32875ee286ad592f4 to your computer and use it in GitHub Desktop.
rust async/await toy executor and http handler trait
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The executor part was directly copied from the async book | |
// the http handler trait was written through trial and error | |
// and reading documentation | |
// | |
// Thre's two different handlers that I wrote, which behave the | |
// same way. The difference is that one is written with the | |
// async_trait macro and the other is hand written. | |
// | |
// The handlers pretty much look like the example code, in async_trait | |
// but the error message when the lifetime parameter on w and r were | |
// left out were buggy and mentioned an "'async" liftime, but the | |
// name was actually "'async_trait". Anyways, at this point I realized | |
// I didn't understand what async_trait does or what what an async fn | |
// trait method is or looks like so I wrote the desugared version folloing | |
// the explanation code in the async_trait docs. It's not exactly what's | |
// happening and I still had to figure out the life time thing (not the | |
// first time dyn defaulting to 'static has shot me in the foot). | |
// | |
// I think I get it now. | |
use { | |
async_trait::async_trait, | |
futures::{ | |
future::{BoxFuture, FutureExt}, | |
join, | |
task::{waker_ref, ArcWake}, | |
}, | |
http, | |
std::{ | |
future::Future, | |
io, | |
io::Cursor, | |
pin::Pin, | |
sync::mpsc::{sync_channel, Receiver, SyncSender}, | |
sync::{Arc, Mutex}, | |
task::{Context, Poll}, | |
time::Duration, | |
}, | |
}; | |
mod sleeper; | |
/// Task executor that receives tasks off of a channel and runs them. | |
struct Executor { | |
ready_queue: Receiver<Arc<Task>>, | |
} | |
/// `Spawner` spawns new futures onto the task channel. | |
#[derive(Clone)] | |
struct Spawner { | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
/// A future that can reschedule itself to be polled by an `Executor`. | |
struct Task { | |
/// In-progress future that should be pushed to completion. | |
/// | |
/// The `Mutex` is not necessary for correctness, since we only have | |
/// one thread executing tasks at once. However, Rust isn't smart | |
/// enough to know that `future` is only mutated from one thread, | |
/// so we need use the `Mutex` to prove thread-safety. A production | |
/// executor would not need this, and could use `UnsafeCell` instead. | |
future: Mutex<Option<BoxFuture<'static, ()>>>, | |
/// Handle to place the task itself back onto the task queue. | |
task_sender: SyncSender<Arc<Task>>, | |
} | |
fn new_executor_and_spawner() -> (Executor, Spawner) { | |
// Maximum number of tasks to allow queueing in the channel at once. | |
// This is just to make `sync_channel` happy, and wouldn't be present in | |
// a real executor. | |
const MAX_QUEUED_TASKS: usize = 10_000; | |
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); | |
(Executor { ready_queue }, Spawner { task_sender }) | |
} | |
impl Spawner { | |
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { | |
let future = future.boxed(); | |
let task = Arc::new(Task { | |
future: Mutex::new(Some(future)), | |
task_sender: self.task_sender.clone(), | |
}); | |
self.task_sender.send(task).expect("too many tasks queued"); | |
} | |
} | |
impl ArcWake for Task { | |
fn wake_by_ref(arc_self: &Arc<Self>) { | |
// Implement `wake` by sending this task back onto the task channel | |
// so that it will be polled again by the executor. | |
let cloned = arc_self.clone(); | |
arc_self | |
.task_sender | |
.send(cloned) | |
.expect("too many tasks queued"); | |
} | |
} | |
impl Executor { | |
fn run(&self) { | |
while let Ok(task) = self.ready_queue.recv() { | |
// Take the future, and if it has not yet completed (is still Some), | |
// poll it in an attempt to complete it. | |
let mut future_slot = task.future.lock().unwrap(); | |
if let Some(mut future) = future_slot.take() { | |
// Create a `LocalWaker` from the task itself | |
let waker = waker_ref(&task); | |
let context = &mut Context::from_waker(&*waker); | |
// `BoxFuture<T>` is a type alias for | |
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. | |
// We can get a `Pin<&mut dyn Future + Send + 'static>` | |
// from it by calling the `Pin::as_mut` method. | |
if let Poll::Pending = future.as_mut().poll(context) { | |
// We're not done processing the future, so put it | |
// back in its task to be run again in the future. | |
*future_slot = Some(future); | |
} | |
} | |
} | |
} | |
} | |
async fn learn_song() { | |
println!("learning song"); | |
sleeper::Sleeper::new(Duration::new(2, 0)).await; | |
println!("learned song"); | |
} | |
async fn sing_song() { | |
println!("la di da"); | |
} | |
async fn dance() { | |
for _ in 0..4 { | |
println!("swish"); | |
sleeper::Sleeper::new(Duration::new(2, 0)).await; | |
println!("swosh"); | |
} | |
} | |
async fn async_main() { | |
let dance_fut = dance(); | |
let song_fut = async { | |
learn_song().await; | |
sing_song().await; | |
}; | |
join!(dance_fut, song_fut); | |
println!("ta da"); | |
println!("trying the server thing"); | |
{ | |
let h = MyHandler {}; | |
let mut ww = Vec::<u8>::new(); | |
let mut w = http::Response::new(Cursor::new(&mut ww)); | |
let r = http::Request::new(Cursor::new(Vec::<u8>::new())); | |
h.handle(&mut w, &r).await; | |
let s = std::str::from_utf8(ww.as_slice()).unwrap(); | |
println!("{:?}", s) | |
} | |
{ | |
let h = MyAsyncHandler {}; | |
let mut ww = Vec::<u8>::new(); | |
let mut w = http::Response::new(Cursor::new(&mut ww)); | |
let r = http::Request::new(Cursor::new(Vec::<u8>::new())); | |
h.handle(&mut w, &r).await; | |
let s = std::str::from_utf8(ww.as_slice()).unwrap(); | |
println!("{:?}", s) | |
} | |
} | |
#[async_trait] | |
trait HttpAsyncHandler { | |
async fn handle<W, R>( | |
&self, | |
w: &'async_trait mut http::Response<W>, | |
r: &'async_trait http::Request<R>, | |
) where | |
W: io::Write + Send, | |
R: io::Read + Sync; | |
} | |
#[allow(dead_code)] | |
struct MyAsyncHandler; | |
#[async_trait] | |
impl HttpAsyncHandler for MyAsyncHandler { | |
async fn handle<W, R>( | |
&self, | |
w: &'async_trait mut http::Response<W>, | |
r: &'async_trait http::Request<R>, | |
) where | |
W: io::Write + Send, | |
R: io::Read + Sync, | |
{ | |
if r.uri().path() == "/" { | |
*w.status_mut() = http::StatusCode::OK; | |
} else { | |
*w.status_mut() = http::StatusCode::BAD_REQUEST; | |
} | |
w.body_mut().write(b"hello"); | |
println!("{:?}", r.uri().path()); | |
} | |
} | |
trait HttpHandler { | |
fn handle<'a, W, R>( | |
&self, | |
w: &'a mut http::Response<W>, | |
r: &'a http::Request<R>, | |
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> | |
where | |
W: io::Write + Send, | |
R: io::Read + Sync; | |
} | |
#[allow(dead_code)] | |
struct MyHandler; | |
impl MyHandler { | |
fn handle<'a, W, R>( | |
&self, | |
w: &'a mut http::Response<W>, | |
r: &'a http::Request<R>, | |
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>> | |
where | |
W: io::Write + Send, | |
R: io::Read + Sync, | |
{ | |
async fn f<'a, W, R>(w: &'a mut http::Response<W>, r: &'a http::Request<R>) | |
where | |
W: io::Write + Send, | |
R: io::Read + Sync, | |
{ | |
if r.uri().path() == "/" { | |
*w.status_mut() = http::StatusCode::OK; | |
} else { | |
*w.status_mut() = http::StatusCode::BAD_REQUEST; | |
} | |
w.body_mut().write(b"hello"); | |
println!("{:?}", r.uri().path()); | |
} | |
Box::pin(f::<W, R>(w, r)) | |
} | |
} | |
fn main() { | |
let (executor, spawner) = new_executor_and_spawner(); | |
// Spawn a task to print before and after waiting on a timer. | |
spawner.spawn(async_main()); | |
// Drop the spawner so that our executor knows it is finished and won't | |
// receive more incoming tasks to run. | |
drop(spawner); | |
// Run the executor until the task queue is empty. | |
// This will print "howdy!", pause, and then print "done!". | |
executor.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment