/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at */
use std::{
sync::atomic::{AtomicUsize, Ordering::SeqCst},
use {
task::{Spawn, SpawnError},
nserror::{nsresult, NS_ERROR_UNEXPECTED, NS_OK},
sync::{Arc, Condvar, Mutex},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
interfaces::{nsIEventTarget, nsIRunnable, nsISerialEventTarget},
xpcom, xpcom_method, RefPtr,
impl<T: ?Sized> FutureExt for T where T: Future {}
trait FutureExt: Future {
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>
Self: Unpin,
/// Demo `Future` to demonstrate executing futures to completion via `Task`.
struct MyFuture {
poll_count: u32,
ready: bool,
waker: Option<Waker>,
impl Default for MyFuture {
fn default() -> Self {
Self {
poll_count: 0,
ready: false,
waker: None,
impl Future for MyFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.poll_count += 1;
if let Some(waker) = &mut self.waker {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
} else {
let waker = cx.waker().clone();
self.waker = Some(waker);
println!("Poll count = {}", self.poll_count);
if self.ready {
} else {
self.ready = true;
// Just notify the task that we need to re-polled.
if let Some(waker) = &self.waker {
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the
// result into `Result<(), SpawnError>`.
fn dispatch_with_options(
event_target: &nsIEventTarget,
runnable: &nsIRunnable,
options: u32,
) -> Result<(), SpawnError> {
// DispatchFromScript does an AddRef on runnable for us.
match unsafe { event_target.DispatchFromScript(runnable, options) } {
NS_OK => Ok(()),
// From nsIEventTarget.idl:
// NS_ERROR_UNEXPECTED - Indicates that the thread is shutting down and
// has finished processing events, so this event would never run and
// has not been dispatched.
NS_ERROR_UNEXPECTED => Err(SpawnError::shutdown()),
_ => unreachable!(),
// helper for dispatching an `nsIRunnable` to `nsIEventTarget`, converting the
// result into `Result<(), SpawnError>`.
fn dispatch(event_target: &nsIEventTarget, runnable: &nsIRunnable) -> Result<(), SpawnError> {
nsIEventTarget::DISPATCH_NORMAL as u32,
// Define a type which implements nsIRunnable in rust.
#[xpimplements(nsIRunnable, nsISupports)]
#[refcnt = "atomic"]
struct InitTask {
future: UnsafeCell<FutureObj<'static, ()>>,
event_target: RefPtr<nsIEventTarget>,
state: AtomicUsize,
impl Task {
pub fn new(future: FutureObj<'static, ()>, event_target: &nsIEventTarget) -> RefPtr<Task> {
Task::allocate(InitTask {
future: UnsafeCell::new(future),
event_target: RefPtr::new(event_target),
state: AtomicUsize::new(IDLE),
/// Runs a closure from the context of the task.
/// Any wake notifications resulting from the execution of the closure are
/// tracked.
fn enter<F, R>(&self, f: F) -> R
F: FnOnce(&mut Context<'_>) -> R,
let waker = self.waker();
let mut cx = Context::from_waker(&waker);
f(&mut cx)
/// Obtain a `Waker` that can be used to wake the `Task`.
fn waker(&self) -> Waker {
unsafe {
let raw = to_raw(RefPtr::new(self));
// nsIRunnable implementation
impl Task {
xpcom_method!(run => Run());
fn run(&self) -> Result<(), nsresult> {
self.enter(|cx| {
// Safety: The ownership of this `Task` object is evidence that
// we are in the `POLL`/`REPOLL` state.
unsafe {
loop {
let fut = &mut (*self.future.get());
let res = fut.poll_unpin(cx);
match res {
Poll::Pending => {}
Poll::Ready(()) => return self.complete(),
if !self.wait() {
break; // we've waited
// Task State Machine - This was heavily cribbed from futures-executor::ThreadPool
// There are four possible task states, listed below with their possible
// transitions:
// The task is blocked, waiting on an event
const IDLE: usize = 0; // --> POLL
// The task is actively being polled by a thread; arrival of additional events
// of interest should move it to the REPOLL state
const POLL: usize = 1; // --> IDLE, REPOLL, or COMPLETE
// The task is actively being polled, but will need to be re-polled upon
// completion to ensure that all events were observed.
const REPOLL: usize = 2; // --> POLL
// The task has finished executing (either successfully or with an error/panic)
const COMPLETE: usize = 3; // No transitions out
impl Task {
/// Attempt to "wake up" the task and poll the future.
/// A `true` result indicates that the `POLL` state has been entered, and
/// the caller can proceed to poll the future. An `false` result indicates
/// that polling is not necessary (because the task is finished or the
/// polling has been delegated).
pub(crate) fn wake_up(&self) -> bool {
let mut state = self.state.load(SeqCst);
loop {
match state {
// The task is idle, so try to run it immediately.
IDLE => match self.state.compare_exchange(IDLE, POLL, SeqCst, SeqCst) {
Ok(_) => {
return true;
Err(cur) => state = cur,
// The task is being polled, so we need to record that it should
// be *repolled* when complete.
POLL => match self.state.compare_exchange(POLL, REPOLL, SeqCst, SeqCst) {
Ok(_) => return false,
Err(cur) => state = cur,
// The task is already scheduled for polling, or is complete, so
// we've got nothing to do.
_ => return false,
/// Alert the Task that polling is about to begin, clearing any accumulated
/// re-poll requests.
/// # Safety
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `wakeup` and `wait`/`complete`.
pub(crate) unsafe fn start_poll(&self) {, SeqCst);
/// Alert the Task that polling completed with `Pending`.
/// Returns true if a `REPOLL` is pending.
/// # Safety
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `notify` and `wait`/`complete`.
pub(crate) unsafe fn wait(&self) -> bool {
match self.state.compare_exchange(POLL, IDLE, SeqCst, SeqCst) {
// no wakeups came in while we were running
Ok(_) => false,
// guaranteed to be in REPOLL state; just clobber the
// state and run again.
Err(state) => {
assert_eq!(state, REPOLL);, SeqCst);
/// Alert the mutex that the task has completed execution and should not be
/// notified again.
/// # Safety
/// Callable only from the `POLL`/`REPOLL` states, i.e. between
/// successful calls to `wakeup` and `wait`/`complete`.
pub(crate) unsafe fn complete(&self) {, SeqCst);
// Waker interface - Implementation for RawWaker so Task nsIRunnable can be
// used to wake itself.
impl Task {
fn wake(&self) {
if self.wake_up() {
// TODO: how should we handle error?
dispatch(self.event_target.coerce(), self.coerce()).unwrap()
static TASK_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
unsafe fn to_raw(waker: RefPtr<Task>) -> RawWaker {
let mut raw = std::ptr::null();
waker.forget(&mut raw);
RawWaker::new(raw as *const (), &TASK_WAKER_VTABLE)
unsafe fn from_raw(raw: *const ()) -> RefPtr<Task> {
RefPtr::from_raw_dont_addref(raw as *const Task).expect("Received null ptr")
unsafe fn from_raw_clone(raw: *const ()) -> RefPtr<Task> {
RefPtr::from_raw(raw as *const Task).expect("Received null ptr")
unsafe fn task_waker_clone(raw: *const ()) -> RawWaker {
unsafe fn task_waker_wake(raw: *const ()) {
let task = from_raw(raw);
unsafe fn task_waker_wake_by_ref(raw: *const ()) {
let task = from_raw(raw);
// We don't actually own a reference to the waker
unsafe fn task_waker_drop(raw: *const ()) {
let _ = from_raw(raw);
/// A general purpose interface to for scheduling tasks that poll futures to
/// completion on a BackgroundTaskQueue.
/// ```
/// let tq = BackgroundTaskQueue::new(cstr!("TQ")).unwrap();
/// let future = async { /* ... */ };
/// task_queue.spawn(future);
/// ```
struct BackgroundTaskQueue {
task_queue: RefPtr<nsISerialEventTarget>,
impl BackgroundTaskQueue {
pub fn new(name: &'static CStr) -> Result<Self, nsresult> {
let task_queue = moz_task::create_background_task_queue(name)?;
Ok(BackgroundTaskQueue { task_queue })
/// Dispatches a task to the task queue that will be run the future to completion.
pub fn dispatch_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
let task = Task::new(future, self.task_queue.coerce());
dispatch(self.task_queue.coerce(), task.coerce())
impl Spawn for BackgroundTaskQueue {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
struct WaitBool {
inner: Arc<(Mutex<bool>, Condvar)>,
impl Default for WaitBool {
fn default() -> Self {
Self {
inner: Arc::new((Mutex::new(false), Condvar::new())),
impl WaitBool {
pub fn notify(&self) {
let (lock, cvar) = &*self.inner;
let mut signal = lock.lock().unwrap();
*signal = true;
// We notify the condvar that the value has changed.
pub fn wait(&self) {
let (lock, cvar) = &*self.inner;
let mut signal = lock.lock().unwrap();
while !*signal {
signal = cvar.wait(signal).unwrap();
pub unsafe extern "C" fn Rust_Future(it_worked: *mut bool) {
let wb1 = WaitBool::default();
let wb2 = wb1.clone();
let task_queue =
BackgroundTaskQueue::new(cstr!("Rust_Future")).expect("Failed to create task queue");
if let Ok(..) = task_queue.spawn(async move {
}) {
// Wait for the future to complete
*it_worked = true;
