Last active
July 7, 2019 09:48
-
-
Save yuk1ty/c8c2cdadbb842c4b64928e044768f347 to your computer and use it in GitHub Desktop.
Rust 1.36 から入った Future の Runner を作る練習→https://keens.github.io/blog/2019/07/07/rustnofuturetosonorunnerwotsukuttemita/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::future::Future; | |
use std::ops::{Deref, DerefMut}; | |
use std::pin::Pin; | |
use std::sync::mpsc::{channel, Receiver}; | |
use std::sync::{Arc, Condvar, Mutex}; | |
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; | |
use std::thread; | |
pub struct ReturnFuture<T>(Option<T>); | |
impl<T> ReturnFuture<T> { | |
pub fn new(t: T) -> Self { | |
Self(Some(t)) | |
} | |
} | |
impl<T> Future for ReturnFuture<T> | |
where | |
T: Unpin, | |
{ | |
type Output = T; | |
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> { | |
Poll::Ready( | |
self.get_mut() | |
.0 | |
.take() | |
.expect("A future should never be polled after it returns Ready."), | |
) | |
} | |
} | |
pub struct SpinRunner; | |
impl SpinRunner { | |
pub fn new() -> Self { | |
Self | |
} | |
pub fn run<F>(&mut self, future: F) -> F::Output | |
where | |
F: Future, | |
{ | |
let mut future = Pin::from(Box::new(future)); | |
self.run_pin(future.as_mut()) | |
} | |
pub fn run_pin<F>(&mut self, mut future: Pin<F>) -> <<F as Deref>::Target as Future>::Output | |
where | |
F: DerefMut, | |
<F as Deref>::Target: Future, | |
{ | |
let waker = SpinWaker::waker(); | |
let mut cx = Context::from_waker(&waker); | |
let mut i = 0; | |
loop { | |
i += 1; | |
match future.as_mut().poll(&mut cx) { | |
Poll::Ready(ret) => { | |
println!("{}", i); | |
return ret; | |
} | |
Poll::Pending => continue, | |
} | |
} | |
} | |
} | |
#[derive(Clone, Debug)] | |
pub struct SpinWaker; | |
static SPIN_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( | |
SpinWaker::unsafe_clone, | |
SpinWaker::unsafe_wake, | |
SpinWaker::unsafe_wake_by_ref, | |
SpinWaker::unsafe_drop, | |
); | |
impl SpinWaker { | |
fn waker() -> Waker { | |
unsafe { Waker::from_raw(Self::new().into_raw_waker()) } | |
} | |
fn new() -> Self { | |
Self | |
} | |
unsafe fn into_raw_waker(self) -> RawWaker { | |
let ptr = Box::into_raw(Box::new(self)) as *const (); | |
RawWaker::new(ptr, &SPIN_WAKER_VTABLE) | |
} | |
unsafe fn unsafe_clone(this: *const ()) -> RawWaker { | |
let ptr = this as *const Self; | |
Box::new(ptr.as_ref().unwrap().clone()).into_raw_waker() | |
} | |
fn wake(self: Self) {} | |
unsafe fn unsafe_wake(this: *const ()) { | |
let ptr = this as *mut Self; | |
Box::from_raw(ptr).wake() | |
} | |
fn wake_by_ref(&self) { | |
Box::new(self.clone()).wake() | |
} | |
unsafe fn unsafe_wake_by_ref(this: *const ()) { | |
let ptr = this as *const Self; | |
ptr.as_ref().unwrap().wake_by_ref() | |
} | |
unsafe fn unsafe_drop(this: *const ()) { | |
let ptr = this as *mut Self; | |
Box::from_raw(ptr); | |
} | |
} | |
struct CondRunner(Arc<(Mutex<()>, Condvar)>); | |
impl CondRunner { | |
fn new() -> Self { | |
Self(Arc::new((Mutex::new(()), Condvar::new()))) | |
} | |
pub fn run<F>(&mut self, future: F) -> F::Output | |
where | |
F: Future, | |
{ | |
let mut future = Pin::from(Box::new(future)); | |
self.run_pin(future.as_mut()) | |
} | |
pub fn run_pin<F>(&mut self, mut future: Pin<F>) -> <<F as Deref>::Target as Future>::Output | |
where | |
F: DerefMut, | |
<F as Deref>::Target: Future, | |
{ | |
let waker = CondWaker::waker(self.0.clone()); | |
let mut cx = Context::from_waker(&waker); | |
let mut i = 0; | |
loop { | |
i += 1; | |
match future.as_mut().poll(&mut cx) { | |
Poll::Ready(ret) => { | |
println!("{}", i); | |
return ret; | |
} | |
Poll::Pending => { | |
let lock = (self.0).0.lock().unwrap(); | |
*(self.0).1.wait(lock).unwrap(); | |
} | |
} | |
} | |
} | |
} | |
#[derive(Debug)] | |
struct CondWaker(Arc<(Mutex<()>, Condvar)>); | |
static COND_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( | |
CondWaker::unsafe_clone, | |
CondWaker::unsafe_wake, | |
CondWaker::unsafe_wake_by_ref, | |
CondWaker::unsafe_drop, | |
); | |
impl CondWaker { | |
fn waker(inner: Arc<(Mutex<()>, Condvar)>) -> Waker { | |
unsafe { Waker::from_raw(Box::new(Self::new(inner)).into_raw_waker()) } | |
} | |
fn new(inner: Arc<(Mutex<()>, Condvar)>) -> Self { | |
Self(inner) | |
} | |
unsafe fn into_raw_waker(self) -> RawWaker { | |
let ptr = Arc::into_raw(self.0) as *const (); | |
RawWaker::new(ptr, &COND_WAKER_VTABLE) | |
} | |
unsafe fn unsafe_clone(this: *const ()) -> RawWaker { | |
let ptr = this as *const (Mutex<()>, Condvar); | |
let arc = Arc::from_raw(ptr); | |
let ret = Self::new(arc.clone()).into_raw_waker(); | |
std::mem::forget(arc); | |
ret | |
} | |
fn wake(self) { | |
(self.0).1.notify_all() | |
} | |
unsafe fn unsafe_wake(this: *const ()) { | |
let ptr = this as *const (Mutex<()>, Condvar); | |
Self::new(Arc::from_raw(ptr)).wake() | |
} | |
unsafe fn unsafe_wake_by_ref(this: *const ()) { | |
let ptr = this as *const (Mutex<()>, Condvar); | |
let arc = Arc::from_raw(ptr); | |
let ret = Self::new(arc.clone()); | |
std::mem::forget(arc); | |
ret.wake() | |
} | |
unsafe fn unsafe_drop(this: *const ()) { | |
let ptr = this as *const (Mutex<()>, Condvar); | |
Self::new(Arc::from_raw(ptr)); | |
} | |
} | |
struct ThreadFuture<T> { | |
rx: Receiver<T>, | |
waker: Arc<Mutex<Option<Waker>>>, | |
} | |
impl<T> ThreadFuture<T> | |
where | |
T: Send + 'static, | |
{ | |
pub fn start<F>(f: F) -> Self | |
where | |
F: FnOnce() -> T + Send + 'static, | |
{ | |
let (tx, rx) = channel(); | |
let waker = Arc::new(Mutex::new(None::<Waker>)); | |
let w = waker.clone(); | |
thread::spawn(move || { | |
tx.send(f()).unwrap(); | |
if let Some(waker) = &*w.lock().unwrap() { | |
waker.wake_by_ref() | |
} | |
}); | |
Self { rx, waker } | |
} | |
} | |
impl<T> Future for ThreadFuture<T> | |
where | |
T: Send + Sync + 'static, | |
{ | |
type Output = T; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
*self.waker.lock().unwrap() = Some(cx.waker().clone()); | |
match self.rx.try_recv() { | |
Ok(t) => Poll::Ready(t), | |
Err(_) => Poll::Pending, | |
} | |
} | |
} | |
fn fib(n: u64) -> u64 { | |
if n < 2 { | |
1 | |
} else { | |
fib(n - 1) + fib(n - 2) | |
} | |
} | |
fn main() { | |
{ | |
println!("Cond Runner"); | |
let mut runner = CondRunner::new(); | |
let future = ThreadFuture::start(|| fib(42)); | |
let ret = runner.run(future); | |
println!("answer is {}", ret); | |
} | |
{ | |
println!("Spin Runner"); | |
let mut runner = SpinRunner::new(); | |
let future = ThreadFuture::start(|| fib(42)); | |
let ret = runner.run(future); | |
println!("answer is {}", ret); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment