Skip to content

Instantly share code, notes, and snippets.

@aloucks

aloucks/main.rs Secret

Created March 23, 2020 21:45
Show Gist options
  • Save aloucks/b83f2101f530471e1d802305d5265264 to your computer and use it in GitHub Desktop.
Save aloucks/b83f2101f530471e1d802305d5265264 to your computer and use it in GitHub Desktop.
future
// Cargo.toml:
//
// [package]
// name = "test"
// version = "0.1.0"
// edition = "2018"
//
// [dependencies]
// futures = "0.3.4"
// once_cell = "1.3.1"
// parking_lot = "0.10.0"
// rand = "0.7.3"
// device_query = "0.2"
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use device_query::DeviceQuery;
use futures::task::{Context, Poll, Waker};
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use rand::Rng;
use std::time::Duration;
static DRIVER: OnceCell<Driver> = OnceCell::new();
type Token = u64;
type AtomicToken = AtomicU64;
struct MyFuture<F: FnOnce()> {
work_remaining: u32,
token: Token,
callback: Option<Box<F>>,
}
#[derive(Default)]
struct Driver {
next_token: AtomicToken,
wakers: Mutex<HashMap<Token, Waker>>,
running: AtomicBool,
}
fn main() {
// Install the driver. Perhaps one per Queue.
DRIVER.get_or_init(Driver::default);
// Start the driver thread
let driver_thread = std::thread::spawn(move || {
let driver = DRIVER.get().unwrap();
driver.running.store(true, Ordering::Relaxed);
// Use mouse mouse movement to simulate IO notification.
let device_state = device_query::DeviceState::new();
let mut old_coords = None;
while driver.running.load(Ordering::Relaxed) {
// In the GPU context we could wait on a Fence, timeline semaphore(s), or some
// other synchronization primitive(s). It could even be an mpsc::channel that
// receives notification after every queue submission, present, or frame acquire.
'wait_for_io: loop {
let mouse = device_state.get_mouse();
if old_coords == Some(mouse.coords) {
std::thread::sleep(Duration::from_millis(1));
} else {
old_coords = Some(mouse.coords);
break 'wait_for_io;
}
}
// Ideally we would have fine grained notification, but for demo purposes
// we'll just notify all tasks.
driver.notify_all();
}
println!("{:?} == Driver", thread::current().id());
});
fn callback(name: &str) {
println!(" {} callback, thread: {:?}", name, thread::current().id());
}
futures::executor::block_on(async {
let my0 = MyFuture::new(100, || callback("my0"));
let my1 = MyFuture::new(100, || callback("my1"));
let my2 = MyFuture::new(100, || callback("my2"));
let my3 = MyFuture::new(100, || callback("my3"));
let my4 = MyFuture::new(100, || callback("my4"));
let my5 = MyFuture::new(0, || callback("my5")); // will resolve immediately
futures::join!(my0, my1, my2, my3, my4, my5)
});
println!("{:?} == Main", thread::current().id());
DRIVER
.get()
.unwrap()
.running
.store(false, Ordering::Relaxed);
driver_thread.join().unwrap();
}
impl Driver {
fn get_next_token(&self) -> Token {
self.next_token
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
fn register(&self, token: Token, waker: Waker) {
self.wakers.lock().insert(token, waker);
}
fn unregister(&self, token: Token) {
self.wakers.lock().remove(&token);
}
fn notify_all(&self) {
let wakers = self.wakers.lock();
for waker in wakers.values() {
waker.wake_by_ref();
}
}
}
impl<F: FnOnce()> Future for MyFuture<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!(
"MyFuture::poll; token = {}, work_remaining = {}",
self.token, self.work_remaining
);
// Simulate some variable amount of work that could be executed
// in this invocation of poll. We use this in the example to
// allow for the futures to resolve at somewhat random times.
let work = rand::thread_rng().gen_range(0, 10);
let work_remaining = self.work_remaining.saturating_sub(work);
self.work_remaining = work_remaining;
// In the GPU context we would need to query for the buffer mapping
// or frame acquisition state. For example, if the buffer is unmapped
// execute the callback. Otherwise, register with the driver that
// we should be polled again when the buffer state may have changed.
if self.work_remaining == 0 {
println!("MyFuture::poll; token = {} (resolved)", self.token);
self.callback.take().map(|f| f());
Poll::Ready(())
} else {
// Otherwise, (re)register our interest with the driver so we're
// polled again when something may have happened
let driver = DRIVER.get().expect("Driver not installed");
driver.register(self.token, cx.waker().clone());
Poll::Pending
}
}
}
impl<F: FnOnce()> Drop for MyFuture<F> {
fn drop(&mut self) {
let driver = DRIVER.get().expect("Driver not installed");
driver.unregister(self.token);
}
}
impl<F: FnOnce()> MyFuture<F> {
fn new(work_remaining: u32, f: F) -> MyFuture<F> {
let driver = DRIVER.get().expect("Driver not installed");
let token = driver.get_next_token();
MyFuture {
work_remaining,
token,
callback: Some(Box::new(f)),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment