Skip to content

Instantly share code, notes, and snippets.

@ear7h
Created May 20, 2020 02:56
Show Gist options
  • Save ear7h/143282aa8451d1b32875ee286ad592f4 to your computer and use it in GitHub Desktop.
Save ear7h/143282aa8451d1b32875ee286ad592f4 to your computer and use it in GitHub Desktop.
rust async/await toy executor and http handler trait
// The executor part was directly copied from the async book
// the http handler trait was written through trial and error
// and reading documentation
//
// Thre's two different handlers that I wrote, which behave the
// same way. The difference is that one is written with the
// async_trait macro and the other is hand written.
//
// The handlers pretty much look like the example code, in async_trait
// but the error message when the lifetime parameter on w and r were
// left out were buggy and mentioned an "'async" liftime, but the
// name was actually "'async_trait". Anyways, at this point I realized
// I didn't understand what async_trait does or what what an async fn
// trait method is or looks like so I wrote the desugared version folloing
// the explanation code in the async_trait docs. It's not exactly what's
// happening and I still had to figure out the life time thing (not the
// first time dyn defaulting to 'static has shot me in the foot).
//
// I think I get it now.
use {
async_trait::async_trait,
futures::{
future::{BoxFuture, FutureExt},
join,
task::{waker_ref, ArcWake},
},
http,
std::{
future::Future,
io,
io::Cursor,
pin::Pin,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
},
};
mod sleeper;
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if let Poll::Pending = future.as_mut().poll(context) {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
async fn learn_song() {
println!("learning song");
sleeper::Sleeper::new(Duration::new(2, 0)).await;
println!("learned song");
}
async fn sing_song() {
println!("la di da");
}
async fn dance() {
for _ in 0..4 {
println!("swish");
sleeper::Sleeper::new(Duration::new(2, 0)).await;
println!("swosh");
}
}
async fn async_main() {
let dance_fut = dance();
let song_fut = async {
learn_song().await;
sing_song().await;
};
join!(dance_fut, song_fut);
println!("ta da");
println!("trying the server thing");
{
let h = MyHandler {};
let mut ww = Vec::<u8>::new();
let mut w = http::Response::new(Cursor::new(&mut ww));
let r = http::Request::new(Cursor::new(Vec::<u8>::new()));
h.handle(&mut w, &r).await;
let s = std::str::from_utf8(ww.as_slice()).unwrap();
println!("{:?}", s)
}
{
let h = MyAsyncHandler {};
let mut ww = Vec::<u8>::new();
let mut w = http::Response::new(Cursor::new(&mut ww));
let r = http::Request::new(Cursor::new(Vec::<u8>::new()));
h.handle(&mut w, &r).await;
let s = std::str::from_utf8(ww.as_slice()).unwrap();
println!("{:?}", s)
}
}
#[async_trait]
trait HttpAsyncHandler {
async fn handle<W, R>(
&self,
w: &'async_trait mut http::Response<W>,
r: &'async_trait http::Request<R>,
) where
W: io::Write + Send,
R: io::Read + Sync;
}
#[allow(dead_code)]
struct MyAsyncHandler;
#[async_trait]
impl HttpAsyncHandler for MyAsyncHandler {
async fn handle<W, R>(
&self,
w: &'async_trait mut http::Response<W>,
r: &'async_trait http::Request<R>,
) where
W: io::Write + Send,
R: io::Read + Sync,
{
if r.uri().path() == "/" {
*w.status_mut() = http::StatusCode::OK;
} else {
*w.status_mut() = http::StatusCode::BAD_REQUEST;
}
w.body_mut().write(b"hello");
println!("{:?}", r.uri().path());
}
}
trait HttpHandler {
fn handle<'a, W, R>(
&self,
w: &'a mut http::Response<W>,
r: &'a http::Request<R>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
W: io::Write + Send,
R: io::Read + Sync;
}
#[allow(dead_code)]
struct MyHandler;
impl MyHandler {
fn handle<'a, W, R>(
&self,
w: &'a mut http::Response<W>,
r: &'a http::Request<R>,
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
where
W: io::Write + Send,
R: io::Read + Sync,
{
async fn f<'a, W, R>(w: &'a mut http::Response<W>, r: &'a http::Request<R>)
where
W: io::Write + Send,
R: io::Read + Sync,
{
if r.uri().path() == "/" {
*w.status_mut() = http::StatusCode::OK;
} else {
*w.status_mut() = http::StatusCode::BAD_REQUEST;
}
w.body_mut().write(b"hello");
println!("{:?}", r.uri().path());
}
Box::pin(f::<W, R>(w, r))
}
}
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async_main());
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment