Skip to content

Instantly share code, notes, and snippets.

@ubnt-intrepid
Last active September 13, 2018 19:43
Show Gist options
  • Save ubnt-intrepid/5dfd11753806a180ebe117482f620619 to your computer and use it in GitHub Desktop.
Save ubnt-intrepid/5dfd11753806a180ebe117482f620619 to your computer and use it in GitHub Desktop.
async/await with Tokio runtime (0.1)
cargo-features = ["edition"]
[package]
name = "async-await-with-tokio"
version = "0.0.0"
publish = false
edition = "2018"
[[bin]]
name = "async-await"
path = "main.rs"
doc = false
[dependencies]
hyper = "0.12.5"
tokio = "0.1.7"
tokio-executor = "0.1.2"
futures = "0.1.21"
#![feature(async_await, await_macro, arbitrary_self_types, futures_api, pin)]
#![feature(rust_2018_preview)]
extern crate futures; // 0.1.21
extern crate hyper; // 0.12.5
extern crate tokio; // 0.1.7
extern crate tokio_executor; // 0.1.2
use std::boxed::PinBox;
use std::future::Future;
use std::mem::PinMut;
use std::sync::Arc;
use std::task::{
self as stdtask,
Context,
Executor,
Poll,
SpawnErrorKind,
SpawnObjError,
TaskObj,
Waker,
};
use futures::Stream as _Stream;
use tokio::runtime::{Runtime, TaskExecutor};
// ==== current_as_local_waker ====
struct Current(futures::task::Task);
impl stdtask::Wake for Current {
fn wake(arc_self: &Arc<Self>) {
arc_self.0.notify()
}
}
fn current_as_local_waker() -> stdtask::LocalWaker {
stdtask::local_waker_from_nonlocal(Arc::new(Current(futures::task::current())))
}
// ==== Exec ====
struct Task {
task: PinBox<Option<TaskObj>>,
exec: Exec,
}
impl futures::Future for Task {
type Item = ();
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let local_waker = current_as_local_waker();
let mut cx = Context::new(&local_waker, &mut self.exec);
let pinned =
unsafe { PinMut::map_unchecked(self.task.as_pin_mut(), |task| task.as_mut().unwrap()) };
match Future::poll(pinned, &mut cx) {
Poll::Ready(()) => Ok(futures::Async::Ready(())),
Poll::Pending => Ok(futures::Async::NotReady),
}
}
}
#[derive(Clone)]
struct Exec(TaskExecutor);
impl Executor for Exec {
fn spawn_obj(&mut self, task: TaskObj) -> Result<(), SpawnObjError> {
let task = Task {
task: PinBox::new(Some(task)),
exec: Exec(self.0.clone()),
};
match futures::future::Executor::execute(&mut self.0, task) {
Ok(()) => Ok(()),
Err(err) => Err(SpawnObjError {
task: unsafe { PinBox::unpin(err.into_future().task).take().unwrap() },
kind: SpawnErrorKind::shutdown(),
}),
}
}
}
// ==== FuturesToStd ====
struct FuturesToStd<F>(F);
impl<F> From<F> for FuturesToStd<F>
where
F: futures::Future,
{
fn from(future: F) -> Self {
FuturesToStd(future)
}
}
impl<F> Future for FuturesToStd<F>
where
F: futures::Future,
{
type Output = Result<F::Item, F::Error>;
fn poll(self: PinMut<Self>, cx: &mut Context) -> Poll<Self::Output> {
// TODO: set executor to TLS
// * https://docs.rs/tokio-executor/0.1.2/tokio_executor/fn.with_default.html
with_cx(cx, || {
match unsafe { futures::Future::poll(&mut PinMut::get_mut_unchecked(self).0) } {
Ok(futures::Async::Ready(x)) => Poll::Ready(Ok(x)),
Ok(futures::Async::NotReady) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
})
}
}
fn with_cx<R>(cx: &mut Context, f: impl FnOnce() -> R) -> R {
// TODO: set executor to TLS
// * https://docs.rs/tokio-executor/0.1.2/tokio_executor/fn.with_default.html
futures::executor::with_notify(&WakerToHandle(cx.waker()), 0, f)
}
// imported from: https://github.com/seanmonstar/futures-compat/blob/master/src/futures_01.rs
struct NotifyWaker(Waker);
#[derive(Clone)]
struct WakerToHandle<'a>(&'a Waker);
impl<'a> From<WakerToHandle<'a>> for futures::executor::NotifyHandle {
fn from(handle: WakerToHandle<'a>) -> futures::executor::NotifyHandle {
let ptr = Box::new(NotifyWaker(handle.0.clone()));
unsafe { futures::executor::NotifyHandle::new(Box::into_raw(ptr)) }
}
}
impl futures::executor::Notify for NotifyWaker {
fn notify(&self, _: usize) {
self.0.wake();
}
}
unsafe impl futures::executor::UnsafeNotify for NotifyWaker {
unsafe fn clone_raw(&self) -> futures::executor::NotifyHandle {
WakerToHandle(&self.0).into()
}
unsafe fn drop_raw(&self) {
let ptr: *const futures::executor::UnsafeNotify = self;
drop(Box::from_raw(ptr as *mut futures::executor::UnsafeNotify));
}
}
// === END ====
macro_rules! await_01 {
($e:expr) => {{
await!(FuturesToStd::from($e))
}};
}
// ==== AsyncRuntime ====
struct StdToFutures01<F> {
future: PinBox<F>,
exec: Exec,
}
impl<F, T, E> futures::Future for StdToFutures01<F>
where
F: Future<Output = Result<T, E>>,
{
type Item = T;
type Error = E;
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
let local_waker = current_as_local_waker();
let mut cx = Context::new(&local_waker, &mut self.exec);
let pinned = self.future.as_pin_mut();
match Future::poll(pinned, &mut cx) {
Poll::Ready(Ok(v)) => Ok(futures::Async::Ready(v)),
Poll::Ready(Err(e)) => Err(e),
Poll::Pending => Ok(futures::Async::NotReady),
}
}
}
struct AsyncRuntime {
inner: Runtime,
}
impl AsyncRuntime {
fn new(runtime: Runtime) -> AsyncRuntime {
AsyncRuntime {
inner: runtime,
}
}
fn block_on<F, T, E>(&mut self, future: F) -> Result<T, E>
where
F: Future<Output = Result<T, E>> + Send + 'static,
T: Send + 'static,
E: Send + 'static,
{
let future = StdToFutures01 {
future: PinBox::new(future),
exec: Exec(self.inner.executor()),
};
self.inner.block_on(future)
}
}
fn main() -> Result<(), Box<std::error::Error>> {
let rt = Runtime::new()?;
let mut rt_async = AsyncRuntime::new(rt);
let future = async {
let url = "http://example.com".parse().unwrap();
let response = await_01!(hyper::Client::new().get(url))?;
let (parts, body) = response.into_parts();
let body = await_01!(body.concat2())?;
let body = String::from_utf8_lossy(&*body).into_owned();
let response = hyper::Response::from_parts(parts, body);
Ok(response) as Result<hyper::Response<String>, hyper::Error>
};
let response = rt_async.block_on(future)?;
println!("response = {:#?}", response);
Ok(())
}
nightly-2018-06-28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment