An Abomination Unto Nuggan
/* This Source Code Form is subject to the terms of the Mozilla Public | |
* License, v. 2.0. If a copy of the MPL was not distributed with this | |
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | |
#![allow(non_snake_case)] | |
use std::{ | |
cell::UnsafeCell, | |
sync::atomic::{AtomicUsize, Ordering::SeqCst}, | |
}; | |
use { | |
cstr::*, | |
futures::{ | |
future::FutureObj, | |
task::SpawnExt, | |
task::{Spawn, SpawnError}, | |
}, | |
moz_task, | |
nserror::{nsresult, NS_ERROR_UNEXPECTED, NS_OK}, | |
std::{ | |
ffi::CStr, | |
future::Future, | |
mem, | |
pin::Pin, | |
sync::{Arc, Condvar, Mutex}, | |
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, | |
}, | |
xpcom::{ | |
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget}, | |
xpcom, xpcom_method, RefPtr, | |
}, | |
}; | |
impl<T: ?Sized> FutureExt for T where T: Future {} | |
trait FutureExt: Future { | |
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> | |
where | |
Self: Unpin, | |
{ | |
Pin::new(self).poll(cx) | |
} | |
} | |
/// Demo `Future` to demonstrate executing futures to completion via `Task`. | |
struct MyFuture { | |
poll_count: u32, | |
ready: bool, | |
waker: Option<Waker>, | |
} | |
impl Default for MyFuture { | |
fn default() -> Self { | |
Self { | |
poll_count: 0, | |
ready: false, | |
waker: None, | |
} | |
} | |
} | |
impl Future for MyFuture { | |
type Output = (); | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { | |
self.poll_count += 1; | |
if let Some(waker) = &mut self.waker { | |
if !waker.will_wake(cx.waker()) { | |
*waker = cx.waker().clone(); | |
} | |
} else { | |
let waker = cx.waker().clone(); | |
self.waker = Some(waker); | |
} | |
println!("Poll count = {}", self.poll_count); | |
if self.ready { | |
Poll::Ready(()) | |
} else { | |
self.ready = true; | |
// Just notify the task that we need to re-polled. | |
if let Some(waker) = &self.waker { | |
waker.wake_by_ref(); | |
} | |
Poll::Pending | |
} | |
} | |
} | |
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the | |
// result into `Result<(), SpawnError>`. | |
fn dispatch_with_options( | |
event_target: &nsIEventTarget, | |
runnable: &nsIRunnable, | |
options: u32, | |
) -> Result<(), SpawnError> { | |
// DispatchFromScript does an AddRef on runnable for us. | |
match unsafe { event_target.DispatchFromScript(runnable, options) } { | |
NS_OK => Ok(()), | |
// From nsIEventTarget.idl: | |
// NS_ERROR_UNEXPECTED - Indicates that the thread is shutting down and | |
// has finished processing events, so this event would never run and | |
// has not been dispatched. | |
NS_ERROR_UNEXPECTED => Err(SpawnError::shutdown()), | |
_ => unreachable!(), | |
} | |
} | |
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the | |
// result into `Result<(), SpawnError>`. | |
fn dispatch(event_target: &nsIEventTarget, runnable: &nsIRunnable) -> Result<(), SpawnError> { | |
dispatch_with_options( | |
event_target, | |
runnable, | |
nsIEventTarget::DISPATCH_NORMAL as u32, | |
) | |
} | |
// Define a type which implements nsIRunnable in rust. | |
#[derive(xpcom)] | |
#[xpimplements(nsIRunnable, nsISupports)] | |
#[refcnt = "atomic"] | |
struct InitTask { | |
future: UnsafeCell<FutureObj<'static, ()>>, | |
event_target: RefPtr<nsIEventTarget>, | |
state: AtomicUsize, | |
} | |
impl Task { | |
pub fn new(future: FutureObj<'static, ()>, event_target: &nsIEventTarget) -> RefPtr<Task> { | |
Task::allocate(InitTask { | |
future: UnsafeCell::new(future), | |
event_target: RefPtr::new(event_target), | |
state: AtomicUsize::new(IDLE), | |
}) | |
} | |
/// Runs a closure from the context of the task. | |
/// | |
/// Any wake notifications resulting from the execution of the closure are | |
/// tracked. | |
fn enter<F, R>(&self, f: F) -> R | |
where | |
F: FnOnce(&mut Context<'_>) -> R, | |
{ | |
let waker = self.waker(); | |
let mut cx = Context::from_waker(&waker); | |
f(&mut cx) | |
} | |
/// Obtain a `Waker` that can be used to wake the `Task`. | |
fn waker(&self) -> Waker { | |
unsafe { | |
let raw = to_raw(RefPtr::new(self)); | |
Waker::from_raw(raw) | |
} | |
} | |
} | |
// nsIRunnable implementation | |
impl Task { | |
xpcom_method!(run => Run()); | |
fn run(&self) -> Result<(), nsresult> { | |
self.enter(|cx| { | |
// Safety: The ownership of this `Task` object is evidence that | |
// we are in the `POLL`/`REPOLL` state. | |
unsafe { | |
self.start_poll(); | |
loop { | |
let fut = &mut (*self.future.get()); | |
let res = fut.poll_unpin(cx); | |
match res { | |
Poll::Pending => {} | |
Poll::Ready(()) => return self.complete(), | |
} | |
if !self.wait() { | |
break; // we've waited | |
} | |
} | |
} | |
}); | |
Ok(()) | |
} | |
} | |
// Task State Machine - This was heavily cribbed from futures-executor::ThreadPool | |
// There are four possible task states, listed below with their possible | |
// transitions: | |
// The task is blocked, waiting on an event | |
const IDLE: usize = 0; // --> POLL | |
// The task is actively being polled by a thread; arrival of additional events | |
// of interest should move it to the REPOLL state | |
const POLL: usize = 1; // --> IDLE, REPOLL, or COMPLETE | |
// The task is actively being polled, but will need to be re-polled upon | |
// completion to ensure that all events were observed. | |
const REPOLL: usize = 2; // --> POLL | |
// The task has finished executing (either successfully or with an error/panic) | |
const COMPLETE: usize = 3; // No transitions out | |
impl Task { | |
/// Attempt to "wake up" the task and poll the future. | |
/// | |
/// A `true` result indicates that the `POLL` state has been entered, and | |
/// the caller can proceed to poll the future. An `false` result indicates | |
/// that polling is not necessary (because the task is finished or the | |
/// polling has been delegated). | |
pub(crate) fn wake_up(&self) -> bool { | |
let mut state = self.state.load(SeqCst); | |
loop { | |
match state { | |
// The task is idle, so try to run it immediately. | |
IDLE => match self.state.compare_exchange(IDLE, POLL, SeqCst, SeqCst) { | |
Ok(_) => { | |
return true; | |
} | |
Err(cur) => state = cur, | |
}, | |
// The task is being polled, so we need to record that it should | |
// be *repolled* when complete. | |
POLL => match self.state.compare_exchange(POLL, REPOLL, SeqCst, SeqCst) { | |
Ok(_) => return false, | |
Err(cur) => state = cur, | |
}, | |
// The task is already scheduled for polling, or is complete, so | |
// we've got nothing to do. | |
_ => return false, | |
} | |
} | |
} | |
/// Alert the Task that polling is about to begin, clearing any accumulated | |
/// re-poll requests. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `wakeup` and `wait`/`complete`. | |
pub(crate) unsafe fn start_poll(&self) { | |
self.state.store(POLL, SeqCst); | |
} | |
/// Alert the Task that polling completed with `Pending`. | |
/// | |
/// Returns true if a `REPOLL` is pending. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `notify` and `wait`/`complete`. | |
pub(crate) unsafe fn wait(&self) -> bool { | |
match self.state.compare_exchange(POLL, IDLE, SeqCst, SeqCst) { | |
// no wakeups came in while we were running | |
Ok(_) => false, | |
// guaranteed to be in REPOLL state; just clobber the | |
// state and run again. | |
Err(state) => { | |
assert_eq!(state, REPOLL); | |
self.state.store(POLL, SeqCst); | |
true | |
} | |
} | |
} | |
/// Alert the mutex that the task has completed execution and should not be | |
/// notified again. | |
/// | |
/// # Safety | |
/// | |
/// Callable only from the `POLL`/`REPOLL` states, i.e. between | |
/// successful calls to `wakeup` and `wait`/`complete`. | |
pub(crate) unsafe fn complete(&self) { | |
self.state.store(COMPLETE, SeqCst); | |
} | |
} | |
// Waker interface - Implementation for RawWaker so Task nsIRunnable can be | |
// used to wake itself. | |
impl Task { | |
fn wake(&self) { | |
if self.wake_up() { | |
// TODO: how should we handle error? | |
dispatch(self.event_target.coerce(), self.coerce()).unwrap() | |
} | |
} | |
} | |
static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( | |
task_waker_clone, | |
task_waker_wake, | |
task_waker_wake_by_ref, | |
task_waker_drop, | |
); | |
unsafe fn to_raw(waker: RefPtr<Task>) -> RawWaker { | |
let mut raw = std::ptr::null(); | |
waker.forget(&mut raw); | |
RawWaker::new(raw as *const (), &TASK_WAKER_VTABLE) | |
} | |
unsafe fn from_raw(raw: *const ()) -> RefPtr<Task> { | |
RefPtr::from_raw_dont_addref(raw as *const Task).expect("Received null ptr") | |
} | |
unsafe fn from_raw_clone(raw: *const ()) -> RefPtr<Task> { | |
RefPtr::from_raw(raw as *const Task).expect("Received null ptr") | |
} | |
unsafe fn task_waker_clone(raw: *const ()) -> RawWaker { | |
to_raw(from_raw_clone(raw)) | |
} | |
unsafe fn task_waker_wake(raw: *const ()) { | |
let task = from_raw(raw); | |
task.wake(); | |
} | |
unsafe fn task_waker_wake_by_ref(raw: *const ()) { | |
let task = from_raw(raw); | |
task.wake(); | |
// We don't actually own a reference to the waker | |
mem::forget(task); | |
} | |
unsafe fn task_waker_drop(raw: *const ()) { | |
let _ = from_raw(raw); | |
} | |
/// A general purpose interface to for scheduling tasks that poll futures to | |
/// completion on a BackgroundTaskQueue. | |
/// | |
/// ``` | |
/// let tq = BackgroundTaskQueue::new(cstr!("TQ")).unwrap(); | |
/// let future = async { /* ... */ }; | |
/// task_queue.spawn(future); | |
/// ``` | |
struct BackgroundTaskQueue { | |
task_queue: RefPtr<nsISerialEventTarget>, | |
} | |
impl BackgroundTaskQueue { | |
pub fn new(name: &'static CStr) -> Result<Self, nsresult> { | |
let task_queue = moz_task::create_background_task_queue(name)?; | |
Ok(BackgroundTaskQueue { task_queue }) | |
} | |
/// Dispatches a task to the task queue that will be run the future to completion. | |
pub fn dispatch_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { | |
let task = Task::new(future, self.task_queue.coerce()); | |
dispatch(self.task_queue.coerce(), task.coerce()) | |
} | |
} | |
impl Spawn for BackgroundTaskQueue { | |
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { | |
self.dispatch_obj(future) | |
} | |
} | |
#[derive(Clone)] | |
struct WaitBool { | |
inner: Arc<(Mutex<bool>, Condvar)>, | |
} | |
impl Default for WaitBool { | |
fn default() -> Self { | |
Self { | |
inner: Arc::new((Mutex::new(false), Condvar::new())), | |
} | |
} | |
} | |
impl WaitBool { | |
pub fn notify(&self) { | |
let (lock, cvar) = &*self.inner; | |
let mut signal = lock.lock().unwrap(); | |
*signal = true; | |
// We notify the condvar that the value has changed. | |
cvar.notify_one(); | |
} | |
pub fn wait(&self) { | |
let (lock, cvar) = &*self.inner; | |
let mut signal = lock.lock().unwrap(); | |
while !*signal { | |
signal = cvar.wait(signal).unwrap(); | |
} | |
} | |
} | |
#[no_mangle] | |
pub unsafe extern "C" fn Rust_Future(it_worked: *mut bool) { | |
let wb1 = WaitBool::default(); | |
let wb2 = wb1.clone(); | |
let task_queue = | |
BackgroundTaskQueue::new(cstr!("Rust_Future")).expect("Failed to create task queue"); | |
if let Ok(..) = task_queue.spawn(async move { | |
MyFuture::default().await; | |
wb2.notify() | |
}) { | |
// Wait for the future to complete | |
wb1.wait(); | |
*it_worked = true; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment