Skip to content

Instantly share code, notes, and snippets.

@djg
Created August 21, 2020 00:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djg/2050d09d406084c088b3a586aad2f94d to your computer and use it in GitHub Desktop.
Save djg/2050d09d406084c088b3a586aad2f94d to your computer and use it in GitHub Desktop.
An Abomination Unto Nuggan
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#![allow(non_snake_case)]
use std::{
cell::UnsafeCell,
sync::atomic::{AtomicUsize, Ordering::SeqCst},
};
use {
cstr::*,
futures::{
future::FutureObj,
task::SpawnExt,
task::{Spawn, SpawnError},
},
moz_task,
nserror::{nsresult, NS_ERROR_UNEXPECTED, NS_OK},
std::{
ffi::CStr,
future::Future,
mem,
pin::Pin,
sync::{Arc, Condvar, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
},
xpcom::{
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget},
xpcom, xpcom_method, RefPtr,
},
};
impl<T: ?Sized> FutureExt for T where T: Future {}
trait FutureExt: Future {
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
where
Self: Unpin,
{
Pin::new(self).poll(cx)
}
}
/// Demo `Future` to demonstrate executing futures to completion via `Task`.
struct MyFuture {
poll_count: u32,
ready: bool,
waker: Option<Waker>,
}
impl Default for MyFuture {
fn default() -> Self {
Self {
poll_count: 0,
ready: false,
waker: None,
}
}
}
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.poll_count += 1;
if let Some(waker) = &mut self.waker {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let waker = cx.waker().clone();
self.waker = Some(waker);
}
println!("Poll count = {}", self.poll_count);
if self.ready {
Poll::Ready(())
} else {
self.ready = true;
// Just notify the task that we need to re-polled.
if let Some(waker) = &self.waker {
waker.wake_by_ref();
}
Poll::Pending
}
}
}
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the
// result into `Result<(), SpawnError>`.
fn dispatch_with_options(
event_target: &nsIEventTarget,
runnable: &nsIRunnable,
options: u32,
) -> Result<(), SpawnError> {
// DispatchFromScript does an AddRef on runnable for us.
match unsafe { event_target.DispatchFromScript(runnable, options) } {
NS_OK => Ok(()),
// From nsIEventTarget.idl:
// NS_ERROR_UNEXPECTED - Indicates that the thread is shutting down and
// has finished processing events, so this event would never run and
// has not been dispatched.
NS_ERROR_UNEXPECTED => Err(SpawnError::shutdown()),
_ => unreachable!(),
}
}
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the
// result into `Result<(), SpawnError>`.
fn dispatch(event_target: &nsIEventTarget, runnable: &nsIRunnable) -> Result<(), SpawnError> {
dispatch_with_options(
event_target,
runnable,
nsIEventTarget::DISPATCH_NORMAL as u32,
)
}
// Define a type which implements nsIRunnable in rust.
#[derive(xpcom)]
#[xpimplements(nsIRunnable, nsISupports)]
#[refcnt = "atomic"]
struct InitTask {
future: UnsafeCell<FutureObj<'static, ()>>,
event_target: RefPtr<nsIEventTarget>,
state: AtomicUsize,
}
impl Task {
pub fn new(future: FutureObj<'static, ()>, event_target: &nsIEventTarget) -> RefPtr<Task> {
Task::allocate(InitTask {
future: UnsafeCell::new(future),
event_target: RefPtr::new(event_target),
state: AtomicUsize::new(IDLE),
})
}
/// Runs a closure from the context of the task.
///
/// Any wake notifications resulting from the execution of the closure are
/// tracked.
fn enter<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut Context<'_>) -> R,
{
let waker = self.waker();
let mut cx = Context::from_waker(&waker);
f(&mut cx)
}
/// Obtain a `Waker` that can be used to wake the `Task`.
fn waker(&self) -> Waker {
unsafe {
let raw = to_raw(RefPtr::new(self));
Waker::from_raw(raw)
}
}
}
// nsIRunnable implementation
impl Task {
xpcom_method!(run => Run());
fn run(&self) -> Result<(), nsresult> {
self.enter(|cx| {
// Safety: The ownership of this `Task` object is evidence that
// we are in the `POLL`/`REPOLL` state.
unsafe {
self.start_poll();
loop {
let fut = &mut (*self.future.get());
let res = fut.poll_unpin(cx);
match res {
Poll::Pending => {}
Poll::Ready(()) => return self.complete(),
}
if !self.wait() {
break; // we've waited
}
}
}
});
Ok(())
}
}
// Task State Machine - This was heavily cribbed from futures-executor::ThreadPool
// There are four possible task states, listed below with their possible
// transitions:
// The task is blocked, waiting on an event
const IDLE: usize = 0; // --> POLL
// The task is actively being polled by a thread; arrival of additional events
// of interest should move it to the REPOLL state
const POLL: usize = 1; // --> IDLE, REPOLL, or COMPLETE
// The task is actively being polled, but will need to be re-polled upon
// completion to ensure that all events were observed.
const REPOLL: usize = 2; // --> POLL
// The task has finished executing (either successfully or with an error/panic)
const COMPLETE: usize = 3; // No transitions out
impl Task {
/// Attempt to "wake up" the task and poll the future.
///
/// A `true` result indicates that the `POLL` state has been entered, and
/// the caller can proceed to poll the future. An `false` result indicates
/// that polling is not necessary (because the task is finished or the
/// polling has been delegated).
pub(crate) fn wake_up(&self) -> bool {
let mut state = self.state.load(SeqCst);
loop {
match state {
// The task is idle, so try to run it immediately.
IDLE => match self.state.compare_exchange(IDLE, POLL, SeqCst, SeqCst) {
Ok(_) => {
return true;
}
Err(cur) => state = cur,
},
// The task is being polled, so we need to record that it should
// be *repolled* when complete.
POLL => match self.state.compare_exchange(POLL, REPOLL, SeqCst, SeqCst) {
Ok(_) => return false,
Err(cur) => state = cur,
},
// The task is already scheduled for polling, or is complete, so
// we've got nothing to do.
_ => return false,
}
}
}
/// Alert the Task that polling is about to begin, clearing any accumulated
/// re-poll requests.
///
/// # Safety
///
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `wakeup` and `wait`/`complete`.
pub(crate) unsafe fn start_poll(&self) {
self.state.store(POLL, SeqCst);
}
/// Alert the Task that polling completed with `Pending`.
///
/// Returns true if a `REPOLL` is pending.
///
/// # Safety
///
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `notify` and `wait`/`complete`.
pub(crate) unsafe fn wait(&self) -> bool {
match self.state.compare_exchange(POLL, IDLE, SeqCst, SeqCst) {
// no wakeups came in while we were running
Ok(_) => false,
// guaranteed to be in REPOLL state; just clobber the
// state and run again.
Err(state) => {
assert_eq!(state, REPOLL);
self.state.store(POLL, SeqCst);
true
}
}
}
/// Alert the mutex that the task has completed execution and should not be
/// notified again.
///
/// # Safety
///
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `wakeup` and `wait`/`complete`.
pub(crate) unsafe fn complete(&self) {
self.state.store(COMPLETE, SeqCst);
}
}
// Waker interface - Implementation for RawWaker so Task nsIRunnable can be
// used to wake itself.
impl Task {
fn wake(&self) {
if self.wake_up() {
// TODO: how should we handle error?
dispatch(self.event_target.coerce(), self.coerce()).unwrap()
}
}
}
static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
task_waker_clone,
task_waker_wake,
task_waker_wake_by_ref,
task_waker_drop,
);
unsafe fn to_raw(waker: RefPtr<Task>) -> RawWaker {
let mut raw = std::ptr::null();
waker.forget(&mut raw);
RawWaker::new(raw as *const (), &TASK_WAKER_VTABLE)
}
unsafe fn from_raw(raw: *const ()) -> RefPtr<Task> {
RefPtr::from_raw_dont_addref(raw as *const Task).expect("Received null ptr")
}
unsafe fn from_raw_clone(raw: *const ()) -> RefPtr<Task> {
RefPtr::from_raw(raw as *const Task).expect("Received null ptr")
}
unsafe fn task_waker_clone(raw: *const ()) -> RawWaker {
to_raw(from_raw_clone(raw))
}
unsafe fn task_waker_wake(raw: *const ()) {
let task = from_raw(raw);
task.wake();
}
unsafe fn task_waker_wake_by_ref(raw: *const ()) {
let task = from_raw(raw);
task.wake();
// We don't actually own a reference to the waker
mem::forget(task);
}
unsafe fn task_waker_drop(raw: *const ()) {
let _ = from_raw(raw);
}
/// A general purpose interface to for scheduling tasks that poll futures to
/// completion on a BackgroundTaskQueue.
///
/// ```
/// let tq = BackgroundTaskQueue::new(cstr!("TQ")).unwrap();
/// let future = async { /* ... */ };
/// task_queue.spawn(future);
/// ```
struct BackgroundTaskQueue {
task_queue: RefPtr<nsISerialEventTarget>,
}
impl BackgroundTaskQueue {
pub fn new(name: &'static CStr) -> Result<Self, nsresult> {
let task_queue = moz_task::create_background_task_queue(name)?;
Ok(BackgroundTaskQueue { task_queue })
}
/// Dispatches a task to the task queue that will be run the future to completion.
pub fn dispatch_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
let task = Task::new(future, self.task_queue.coerce());
dispatch(self.task_queue.coerce(), task.coerce())
}
}
impl Spawn for BackgroundTaskQueue {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.dispatch_obj(future)
}
}
#[derive(Clone)]
struct WaitBool {
inner: Arc<(Mutex<bool>, Condvar)>,
}
impl Default for WaitBool {
fn default() -> Self {
Self {
inner: Arc::new((Mutex::new(false), Condvar::new())),
}
}
}
impl WaitBool {
pub fn notify(&self) {
let (lock, cvar) = &*self.inner;
let mut signal = lock.lock().unwrap();
*signal = true;
// We notify the condvar that the value has changed.
cvar.notify_one();
}
pub fn wait(&self) {
let (lock, cvar) = &*self.inner;
let mut signal = lock.lock().unwrap();
while !*signal {
signal = cvar.wait(signal).unwrap();
}
}
}
#[no_mangle]
pub unsafe extern "C" fn Rust_Future(it_worked: *mut bool) {
let wb1 = WaitBool::default();
let wb2 = wb1.clone();
let task_queue =
BackgroundTaskQueue::new(cstr!("Rust_Future")).expect("Failed to create task queue");
if let Ok(..) = task_queue.spawn(async move {
MyFuture::default().await;
wb2.notify()
}) {
// Wait for the future to complete
wb1.wait();
*it_worked = true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment