Skip to content

Instantly share code, notes, and snippets.

@yuk1ty
Last active July 7, 2019 09:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuk1ty/c8c2cdadbb842c4b64928e044768f347 to your computer and use it in GitHub Desktop.
Save yuk1ty/c8c2cdadbb842c4b64928e044768f347 to your computer and use it in GitHub Desktop.
Rust 1.36 から入った Future の Runner を作る練習→https://keens.github.io/blog/2019/07/07/rustnofuturetosonorunnerwotsukuttemita/
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::thread;
pub struct ReturnFuture<T>(Option<T>);
impl<T> ReturnFuture<T> {
pub fn new(t: T) -> Self {
Self(Some(t))
}
}
impl<T> Future for ReturnFuture<T>
where
T: Unpin,
{
type Output = T;
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
Poll::Ready(
self.get_mut()
.0
.take()
.expect("A future should never be polled after it returns Ready."),
)
}
}
pub struct SpinRunner;
impl SpinRunner {
pub fn new() -> Self {
Self
}
pub fn run<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
let mut future = Pin::from(Box::new(future));
self.run_pin(future.as_mut())
}
pub fn run_pin<F>(&mut self, mut future: Pin<F>) -> <<F as Deref>::Target as Future>::Output
where
F: DerefMut,
<F as Deref>::Target: Future,
{
let waker = SpinWaker::waker();
let mut cx = Context::from_waker(&waker);
let mut i = 0;
loop {
i += 1;
match future.as_mut().poll(&mut cx) {
Poll::Ready(ret) => {
println!("{}", i);
return ret;
}
Poll::Pending => continue,
}
}
}
}
#[derive(Clone, Debug)]
pub struct SpinWaker;
static SPIN_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
SpinWaker::unsafe_clone,
SpinWaker::unsafe_wake,
SpinWaker::unsafe_wake_by_ref,
SpinWaker::unsafe_drop,
);
impl SpinWaker {
fn waker() -> Waker {
unsafe { Waker::from_raw(Self::new().into_raw_waker()) }
}
fn new() -> Self {
Self
}
unsafe fn into_raw_waker(self) -> RawWaker {
let ptr = Box::into_raw(Box::new(self)) as *const ();
RawWaker::new(ptr, &SPIN_WAKER_VTABLE)
}
unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
let ptr = this as *const Self;
Box::new(ptr.as_ref().unwrap().clone()).into_raw_waker()
}
fn wake(self: Self) {}
unsafe fn unsafe_wake(this: *const ()) {
let ptr = this as *mut Self;
Box::from_raw(ptr).wake()
}
fn wake_by_ref(&self) {
Box::new(self.clone()).wake()
}
unsafe fn unsafe_wake_by_ref(this: *const ()) {
let ptr = this as *const Self;
ptr.as_ref().unwrap().wake_by_ref()
}
unsafe fn unsafe_drop(this: *const ()) {
let ptr = this as *mut Self;
Box::from_raw(ptr);
}
}
struct CondRunner(Arc<(Mutex<()>, Condvar)>);
impl CondRunner {
fn new() -> Self {
Self(Arc::new((Mutex::new(()), Condvar::new())))
}
pub fn run<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
let mut future = Pin::from(Box::new(future));
self.run_pin(future.as_mut())
}
pub fn run_pin<F>(&mut self, mut future: Pin<F>) -> <<F as Deref>::Target as Future>::Output
where
F: DerefMut,
<F as Deref>::Target: Future,
{
let waker = CondWaker::waker(self.0.clone());
let mut cx = Context::from_waker(&waker);
let mut i = 0;
loop {
i += 1;
match future.as_mut().poll(&mut cx) {
Poll::Ready(ret) => {
println!("{}", i);
return ret;
}
Poll::Pending => {
let lock = (self.0).0.lock().unwrap();
*(self.0).1.wait(lock).unwrap();
}
}
}
}
}
#[derive(Debug)]
struct CondWaker(Arc<(Mutex<()>, Condvar)>);
static COND_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
CondWaker::unsafe_clone,
CondWaker::unsafe_wake,
CondWaker::unsafe_wake_by_ref,
CondWaker::unsafe_drop,
);
impl CondWaker {
fn waker(inner: Arc<(Mutex<()>, Condvar)>) -> Waker {
unsafe { Waker::from_raw(Box::new(Self::new(inner)).into_raw_waker()) }
}
fn new(inner: Arc<(Mutex<()>, Condvar)>) -> Self {
Self(inner)
}
unsafe fn into_raw_waker(self) -> RawWaker {
let ptr = Arc::into_raw(self.0) as *const ();
RawWaker::new(ptr, &COND_WAKER_VTABLE)
}
unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
let ptr = this as *const (Mutex<()>, Condvar);
let arc = Arc::from_raw(ptr);
let ret = Self::new(arc.clone()).into_raw_waker();
std::mem::forget(arc);
ret
}
fn wake(self) {
(self.0).1.notify_all()
}
unsafe fn unsafe_wake(this: *const ()) {
let ptr = this as *const (Mutex<()>, Condvar);
Self::new(Arc::from_raw(ptr)).wake()
}
unsafe fn unsafe_wake_by_ref(this: *const ()) {
let ptr = this as *const (Mutex<()>, Condvar);
let arc = Arc::from_raw(ptr);
let ret = Self::new(arc.clone());
std::mem::forget(arc);
ret.wake()
}
unsafe fn unsafe_drop(this: *const ()) {
let ptr = this as *const (Mutex<()>, Condvar);
Self::new(Arc::from_raw(ptr));
}
}
struct ThreadFuture<T> {
rx: Receiver<T>,
waker: Arc<Mutex<Option<Waker>>>,
}
impl<T> ThreadFuture<T>
where
T: Send + 'static,
{
pub fn start<F>(f: F) -> Self
where
F: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = channel();
let waker = Arc::new(Mutex::new(None::<Waker>));
let w = waker.clone();
thread::spawn(move || {
tx.send(f()).unwrap();
if let Some(waker) = &*w.lock().unwrap() {
waker.wake_by_ref()
}
});
Self { rx, waker }
}
}
impl<T> Future for ThreadFuture<T>
where
T: Send + Sync + 'static,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
match self.rx.try_recv() {
Ok(t) => Poll::Ready(t),
Err(_) => Poll::Pending,
}
}
}
fn fib(n: u64) -> u64 {
if n < 2 {
1
} else {
fib(n - 1) + fib(n - 2)
}
}
fn main() {
{
println!("Cond Runner");
let mut runner = CondRunner::new();
let future = ThreadFuture::start(|| fib(42));
let ret = runner.run(future);
println!("answer is {}", ret);
}
{
println!("Spin Runner");
let mut runner = SpinRunner::new();
let future = ThreadFuture::start(|| fib(42));
let ret = runner.run(future);
println!("answer is {}", ret);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment