Last active
September 13, 2018 19:43
-
-
Save ubnt-intrepid/5dfd11753806a180ebe117482f620619 to your computer and use it in GitHub Desktop.
async/await with Tokio runtime (0.1)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cargo-features = ["edition"] | |
[package] | |
name = "async-await-with-tokio" | |
version = "0.0.0" | |
publish = false | |
edition = "2018" | |
[[bin]] | |
name = "async-await" | |
path = "main.rs" | |
doc = false | |
[dependencies] | |
hyper = "0.12.5" | |
tokio = "0.1.7" | |
tokio-executor = "0.1.2" | |
futures = "0.1.21" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(async_await, await_macro, arbitrary_self_types, futures_api, pin)] | |
#![feature(rust_2018_preview)] | |
extern crate futures; // 0.1.21 | |
extern crate hyper; // 0.12.5 | |
extern crate tokio; // 0.1.7 | |
extern crate tokio_executor; // 0.1.2 | |
use std::boxed::PinBox; | |
use std::future::Future; | |
use std::mem::PinMut; | |
use std::sync::Arc; | |
use std::task::{ | |
self as stdtask, | |
Context, | |
Executor, | |
Poll, | |
SpawnErrorKind, | |
SpawnObjError, | |
TaskObj, | |
Waker, | |
}; | |
use futures::Stream as _Stream; | |
use tokio::runtime::{Runtime, TaskExecutor}; | |
// ==== current_as_local_waker ==== | |
struct Current(futures::task::Task); | |
impl stdtask::Wake for Current { | |
fn wake(arc_self: &Arc<Self>) { | |
arc_self.0.notify() | |
} | |
} | |
fn current_as_local_waker() -> stdtask::LocalWaker { | |
stdtask::local_waker_from_nonlocal(Arc::new(Current(futures::task::current()))) | |
} | |
// ==== Exec ==== | |
struct Task { | |
task: PinBox<Option<TaskObj>>, | |
exec: Exec, | |
} | |
impl futures::Future for Task { | |
type Item = (); | |
type Error = (); | |
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { | |
let local_waker = current_as_local_waker(); | |
let mut cx = Context::new(&local_waker, &mut self.exec); | |
let pinned = | |
unsafe { PinMut::map_unchecked(self.task.as_pin_mut(), |task| task.as_mut().unwrap()) }; | |
match Future::poll(pinned, &mut cx) { | |
Poll::Ready(()) => Ok(futures::Async::Ready(())), | |
Poll::Pending => Ok(futures::Async::NotReady), | |
} | |
} | |
} | |
#[derive(Clone)] | |
struct Exec(TaskExecutor); | |
impl Executor for Exec { | |
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> { | |
let task = Task { | |
task: PinBox::new(Some(task)), | |
exec: Exec(self.0.clone()), | |
}; | |
match futures::future::Executor::execute(&mut self.0, task) { | |
Ok(()) => Ok(()), | |
Err(err) => Err(SpawnObjError { | |
task: unsafe { PinBox::unpin(err.into_future().task).take().unwrap() }, | |
kind: SpawnErrorKind::shutdown(), | |
}), | |
} | |
} | |
} | |
// ==== FuturesToStd ==== | |
struct FuturesToStd<F>(F); | |
impl<F> From<F> for FuturesToStd<F> | |
where | |
F: futures::Future, | |
{ | |
fn from(future: F) -> Self { | |
FuturesToStd(future) | |
} | |
} | |
impl<F> Future for FuturesToStd<F> | |
where | |
F: futures::Future, | |
{ | |
type Output = Result<F::Item, F::Error>; | |
fn poll(self: PinMut<Self>, cx: &mut Context) -> Poll<Self::Output> { | |
// TODO: set executor to TLS | |
// * https://docs.rs/tokio-executor/0.1.2/tokio_executor/fn.with_default.html | |
with_cx(cx, || { | |
match unsafe { futures::Future::poll(&mut PinMut::get_mut_unchecked(self).0) } { | |
Ok(futures::Async::Ready(x)) => Poll::Ready(Ok(x)), | |
Ok(futures::Async::NotReady) => Poll::Pending, | |
Err(e) => Poll::Ready(Err(e)), | |
} | |
}) | |
} | |
} | |
fn with_cx<R>(cx: &mut Context, f: impl FnOnce() -> R) -> R { | |
// TODO: set executor to TLS | |
// * https://docs.rs/tokio-executor/0.1.2/tokio_executor/fn.with_default.html | |
futures::executor::with_notify(&WakerToHandle(cx.waker()), 0, f) | |
} | |
// imported from: https://github.com/seanmonstar/futures-compat/blob/master/src/futures_01.rs | |
struct NotifyWaker(Waker); | |
#[derive(Clone)] | |
struct WakerToHandle<'a>(&'a Waker); | |
impl<'a> From<WakerToHandle<'a>> for futures::executor::NotifyHandle { | |
fn from(handle: WakerToHandle<'a>) -> futures::executor::NotifyHandle { | |
let ptr = Box::new(NotifyWaker(handle.0.clone())); | |
unsafe { futures::executor::NotifyHandle::new(Box::into_raw(ptr)) } | |
} | |
} | |
impl futures::executor::Notify for NotifyWaker { | |
fn notify(&self, _: usize) { | |
self.0.wake(); | |
} | |
} | |
unsafe impl futures::executor::UnsafeNotify for NotifyWaker { | |
unsafe fn clone_raw(&self) -> futures::executor::NotifyHandle { | |
WakerToHandle(&self.0).into() | |
} | |
unsafe fn drop_raw(&self) { | |
let ptr: *const futures::executor::UnsafeNotify = self; | |
drop(Box::from_raw(ptr as *mut futures::executor::UnsafeNotify)); | |
} | |
} | |
// === END ==== | |
macro_rules! await_01 { | |
($e:expr) => {{ | |
await!(FuturesToStd::from($e)) | |
}}; | |
} | |
// ==== AsyncRuntime ==== | |
struct StdToFutures01<F> { | |
future: PinBox<F>, | |
exec: Exec, | |
} | |
impl<F, T, E> futures::Future for StdToFutures01<F> | |
where | |
F: Future<Output = Result<T, E>>, | |
{ | |
type Item = T; | |
type Error = E; | |
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> { | |
let local_waker = current_as_local_waker(); | |
let mut cx = Context::new(&local_waker, &mut self.exec); | |
let pinned = self.future.as_pin_mut(); | |
match Future::poll(pinned, &mut cx) { | |
Poll::Ready(Ok(v)) => Ok(futures::Async::Ready(v)), | |
Poll::Ready(Err(e)) => Err(e), | |
Poll::Pending => Ok(futures::Async::NotReady), | |
} | |
} | |
} | |
struct AsyncRuntime { | |
inner: Runtime, | |
} | |
impl AsyncRuntime { | |
fn new(runtime: Runtime) -> AsyncRuntime { | |
AsyncRuntime { | |
inner: runtime, | |
} | |
} | |
fn block_on<F, T, E>(&mut self, future: F) -> Result<T, E> | |
where | |
F: Future<Output = Result<T, E>> + Send + 'static, | |
T: Send + 'static, | |
E: Send + 'static, | |
{ | |
let future = StdToFutures01 { | |
future: PinBox::new(future), | |
exec: Exec(self.inner.executor()), | |
}; | |
self.inner.block_on(future) | |
} | |
} | |
fn main() -> Result<(), Box<std::error::Error>> { | |
let rt = Runtime::new()?; | |
let mut rt_async = AsyncRuntime::new(rt); | |
let future = async { | |
let url = "http://example.com".parse().unwrap(); | |
let response = await_01!(hyper::Client::new().get(url))?; | |
let (parts, body) = response.into_parts(); | |
let body = await_01!(body.concat2())?; | |
let body = String::from_utf8_lossy(&*body).into_owned(); | |
let response = hyper::Response::from_parts(parts, body); | |
Ok(response) as Result<hyper::Response<String>, hyper::Error> | |
}; | |
let response = rt_async.block_on(future)?; | |
println!("response = {:#?}", response); | |
Ok(()) | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
nightly-2018-06-28 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment