Skip to content

Instantly share code, notes, and snippets.

@tawashichan
Last active December 18, 2019 06:31
Show Gist options
  • Save tawashichan/d4a43268c647e46208b2ebf153a60b0c to your computer and use it in GitHub Desktop.
Save tawashichan/d4a43268c647e46208b2ebf153a60b0c to your computer and use it in GitHub Desktop.
future_sample
// 参考
// https://rust-lang.github.io/async-book/02_execution/04_executor.html
// https://keens.github.io/blog/2019/07/07/rustnofuturetosonorunnerwotsukuttemita/
use std::{
future::{Future},
pin::Pin,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread,
time::Duration,
};
struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
println!("ready");
Poll::Ready(1000)
} else {
println!("pending...");
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
struct Task {
future: Mutex<Option<Pin<Box<dyn Future<Output = ()>>>>>,
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spanner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = Box::pin(future);
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
static CUSTOM_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
CustomWaker::unsafe_clone,
CustomWaker::unsafe_wake,
CustomWaker::unsafe_wake_by_ref,
CustomWaker::unsafe_drop,
);
#[derive(Clone)]
struct CustomWaker{
arc_task: Arc<Task>
}
impl CustomWaker {
fn waker(arc_task: Arc<Task>) -> Waker {
unsafe { Waker::from_raw(Self::new(arc_task).into_raw_waker()) }
}
fn new(arc_task: Arc<Task>) -> Self {
Self{
arc_task,
}
}
unsafe fn into_raw_waker(self) -> RawWaker {
let prt = Box::into_raw(Box::new(self)) as *const ();
RawWaker::new(prt, &CUSTOM_WAKER_VTABLE)
}
unsafe fn unsafe_clone(this: *const ()) -> RawWaker {
let ptr = this as *const Self;
Box::new(ptr.as_ref().unwrap().clone()).into_raw_waker()
}
fn wake(self: Self) {
let cloned = self.arc_task.clone();
self.arc_task.task_sender.send(cloned).expect("too many tasks queued");
}
unsafe fn unsafe_wake(this: *const ()) {
let ptr = this as *mut Self;
Box::from_raw(ptr).wake()
}
fn wake_by_ref(&self) {
Box::new(self.clone()).wake()
}
unsafe fn unsafe_wake_by_ref(this: *const ()) {
let ptr = this as *const Self;
ptr.as_ref().unwrap().wake_by_ref()
}
unsafe fn unsafe_drop(this: *const ()) {
let ptr = this as *mut Self;
Box::from_raw(ptr);
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = CustomWaker::waker(task.clone());
let mut context = Context::from_waker(&waker);
if let Poll::Pending = future.as_mut().poll(&mut context) {
*future_slot = Some(future)
}
}
}
}
}
fn main() {
let (executor, spawner) = new_executor_and_spanner();
spawner.spawn(async {
println!("start");
TimerFuture::new(Duration::from_millis(2000)).await;
println!("finished: {:?}",2000);
});
spawner.spawn(async {
TimerFuture::new(Duration::from_millis(1000)).await;
println!("finished: {:?}",1000);
});
drop(spawner);
executor.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment