-
-
Save aloucks/b83f2101f530471e1d802305d5265264 to your computer and use it in GitHub Desktop.
future
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Cargo.toml: | |
// | |
// [package] | |
// name = "test" | |
// version = "0.1.0" | |
// edition = "2018" | |
// | |
// [dependencies] | |
// futures = "0.3.4" | |
// once_cell = "1.3.1" | |
// parking_lot = "0.10.0" | |
// rand = "0.7.3" | |
// device_query = "0.2" | |
use std::collections::HashMap; | |
use std::future::Future; | |
use std::pin::Pin; | |
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; | |
use std::thread; | |
use device_query::DeviceQuery; | |
use futures::task::{Context, Poll, Waker}; | |
use once_cell::sync::OnceCell; | |
use parking_lot::Mutex; | |
use rand::Rng; | |
use std::time::Duration; | |
static DRIVER: OnceCell<Driver> = OnceCell::new(); | |
type Token = u64; | |
type AtomicToken = AtomicU64; | |
struct MyFuture<F: FnOnce()> { | |
work_remaining: u32, | |
token: Token, | |
callback: Option<Box<F>>, | |
} | |
#[derive(Default)] | |
struct Driver { | |
next_token: AtomicToken, | |
wakers: Mutex<HashMap<Token, Waker>>, | |
running: AtomicBool, | |
} | |
fn main() { | |
// Install the driver. Perhaps one per Queue. | |
DRIVER.get_or_init(Driver::default); | |
// Start the driver thread | |
let driver_thread = std::thread::spawn(move || { | |
let driver = DRIVER.get().unwrap(); | |
driver.running.store(true, Ordering::Relaxed); | |
// Use mouse mouse movement to simulate IO notification. | |
let device_state = device_query::DeviceState::new(); | |
let mut old_coords = None; | |
while driver.running.load(Ordering::Relaxed) { | |
// In the GPU context we could wait on a Fence, timeline semaphore(s), or some | |
// other synchronization primitive(s). It could even be an mpsc::channel that | |
// receives notification after every queue submission, present, or frame acquire. | |
'wait_for_io: loop { | |
let mouse = device_state.get_mouse(); | |
if old_coords == Some(mouse.coords) { | |
std::thread::sleep(Duration::from_millis(1)); | |
} else { | |
old_coords = Some(mouse.coords); | |
break 'wait_for_io; | |
} | |
} | |
// Ideally we would have fine grained notification, but for demo purposes | |
// we'll just notify all tasks. | |
driver.notify_all(); | |
} | |
println!("{:?} == Driver", thread::current().id()); | |
}); | |
fn callback(name: &str) { | |
println!(" {} callback, thread: {:?}", name, thread::current().id()); | |
} | |
futures::executor::block_on(async { | |
let my0 = MyFuture::new(100, || callback("my0")); | |
let my1 = MyFuture::new(100, || callback("my1")); | |
let my2 = MyFuture::new(100, || callback("my2")); | |
let my3 = MyFuture::new(100, || callback("my3")); | |
let my4 = MyFuture::new(100, || callback("my4")); | |
let my5 = MyFuture::new(0, || callback("my5")); // will resolve immediately | |
futures::join!(my0, my1, my2, my3, my4, my5) | |
}); | |
println!("{:?} == Main", thread::current().id()); | |
DRIVER | |
.get() | |
.unwrap() | |
.running | |
.store(false, Ordering::Relaxed); | |
driver_thread.join().unwrap(); | |
} | |
impl Driver { | |
fn get_next_token(&self) -> Token { | |
self.next_token | |
.fetch_add(1, std::sync::atomic::Ordering::Relaxed) | |
} | |
fn register(&self, token: Token, waker: Waker) { | |
self.wakers.lock().insert(token, waker); | |
} | |
fn unregister(&self, token: Token) { | |
self.wakers.lock().remove(&token); | |
} | |
fn notify_all(&self) { | |
let wakers = self.wakers.lock(); | |
for waker in wakers.values() { | |
waker.wake_by_ref(); | |
} | |
} | |
} | |
impl<F: FnOnce()> Future for MyFuture<F> { | |
type Output = (); | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
println!( | |
"MyFuture::poll; token = {}, work_remaining = {}", | |
self.token, self.work_remaining | |
); | |
// Simulate some variable amount of work that could be executed | |
// in this invocation of poll. We use this in the example to | |
// allow for the futures to resolve at somewhat random times. | |
let work = rand::thread_rng().gen_range(0, 10); | |
let work_remaining = self.work_remaining.saturating_sub(work); | |
self.work_remaining = work_remaining; | |
// In the GPU context we would need to query for the buffer mapping | |
// or frame acquisition state. For example, if the buffer is unmapped | |
// execute the callback. Otherwise, register with the driver that | |
// we should be polled again when the buffer state may have changed. | |
if self.work_remaining == 0 { | |
println!("MyFuture::poll; token = {} (resolved)", self.token); | |
self.callback.take().map(|f| f()); | |
Poll::Ready(()) | |
} else { | |
// Otherwise, (re)register our interest with the driver so we're | |
// polled again when something may have happened | |
let driver = DRIVER.get().expect("Driver not installed"); | |
driver.register(self.token, cx.waker().clone()); | |
Poll::Pending | |
} | |
} | |
} | |
impl<F: FnOnce()> Drop for MyFuture<F> { | |
fn drop(&mut self) { | |
let driver = DRIVER.get().expect("Driver not installed"); | |
driver.unregister(self.token); | |
} | |
} | |
impl<F: FnOnce()> MyFuture<F> { | |
fn new(work_remaining: u32, f: F) -> MyFuture<F> { | |
let driver = DRIVER.get().expect("Driver not installed"); | |
let token = driver.get_next_token(); | |
MyFuture { | |
work_remaining, | |
token, | |
callback: Some(Box::new(f)), | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment