Skip to content

Instantly share code, notes, and snippets.

@alexcrichton
Created March 7, 2019 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexcrichton/2e793f8fbd99c438467828811149ce26 to your computer and use it in GitHub Desktop.
Save alexcrichton/2e793f8fbd99c438467828811149ce26 to your computer and use it in GitHub Desktop.
use futures::sync::oneshot;
use futures::Future;
use js_sys::{Array, Error, Promise, Uint8ClampedArray, WebAssembly};
use rayon::prelude::*;
use std::cell::{RefCell, UnsafeCell};
use std::cmp;
use std::mem;
use std::rc::{Rc, Weak};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Mutex, MutexGuard};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::{CanvasRenderingContext2d, ErrorEvent, Event, Worker};
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
macro_rules! console_log {
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
}
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = console)]
fn log(s: &str);
#[wasm_bindgen(js_namespace = console, js_name = log)]
fn logv(x: &JsValue);
}
#[wasm_bindgen]
pub struct Scene {
inner: raytracer::scene::Scene,
}
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
#[wasm_bindgen]
impl Scene {
#[wasm_bindgen(constructor)]
pub fn new(object: &JsValue) -> Result<Scene, JsValue> {
console_error_panic_hook::set_once();
Ok(Scene {
inner: object
.into_serde()
.map_err(|e| JsValue::from(e.to_string()))?,
})
}
pub fn render(self, concurrency: usize, pool: &WorkerPool) -> Result<RenderingScene, JsValue> {
let scene = self.inner;
let height = scene.height;
let width = scene.width;
let pixels = (width * height) as usize;
let mut rgb_data = vec![0; 4 * pixels];
let base = rgb_data.as_ptr() as usize;
let len = rgb_data.len();
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(concurrency - 1)
.spawn(|thread| Ok(pool.run(|| thread.run()).unwrap()))
.unwrap();
let done = pool.run_notify(move || {
thread_pool.install(|| {
rgb_data
.par_chunks_mut(4)
.enumerate()
.for_each(|(i, chunk)| {
let i = i as u32;
let x = i % width;
let y = i / width;
let ray = raytracer::Ray::create_prime(x, y, &scene);
let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba();
chunk[0] = result.data[0];
chunk[1] = result.data[1];
chunk[2] = result.data[2];
chunk[3] = result.data[3];
});
});
rgb_data
})?;
let done = done.map(move |_data| image_data(base, len, width, height).into());
Ok(RenderingScene {
promise: wasm_bindgen_futures::future_to_promise(done),
base,
len,
height,
width,
})
}
}
#[wasm_bindgen]
pub struct WorkerPool {
state: Rc<PoolState>,
}
struct PoolState {
workers: RefCell<Vec<Worker>>,
callback: Closure<dyn FnMut(Event)>,
}
struct Work {
func: Box<FnBox + Send>,
}
#[wasm_bindgen]
impl WorkerPool {
#[wasm_bindgen(constructor)]
pub fn new(initial: usize) -> Result<WorkerPool, JsValue> {
let pool = WorkerPool {
state: Rc::new(PoolState {
workers: RefCell::new(Vec::with_capacity(initial)),
callback: Closure::wrap(Box::new(|event: Event| {
console_log!("unhandled event: {}", event.type_());
logv(&event);
}) as Box<FnMut(Event)>),
}),
};
for _ in 0..initial {
let worker = pool.spawn()?;
pool.state.push(worker);
}
Ok(pool)
}
fn spawn(&self) -> Result<Worker, JsValue> {
console_log!("spawning new worker");
// TODO: what do do about `./worker.js`:
//
// * the path is only known by the bundler. How can we, as a
// library, know what's going on?
// * How do we not fetch a script N times? It internally then
// causes another script to get fetched N times...
let worker = Worker::new("./worker.js")?;
// With a worker spun up send it the module/memory so it can start
// instantiating the wasm module. Later it might receive further
// messages about code to run on the wasm module.
let array = js_sys::Array::new();
array.push(&wasm_bindgen::module());
array.push(&wasm_bindgen::memory());
worker.post_message(&array)?;
Ok(worker)
}
fn worker(&self) -> Result<Worker, JsValue> {
match self.state.workers.borrow_mut().pop() {
Some(worker) => Ok(worker),
None => self.spawn(),
}
}
fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result<Worker, JsValue> {
let worker = self.worker()?;
let work = Box::new(Work { func: Box::new(f) });
let ptr = Box::into_raw(work);
match worker.post_message(&JsValue::from(ptr as u32)) {
Ok(()) => Ok(worker),
Err(e) => {
unsafe {
drop(Box::from_raw(ptr));
}
Err(e)
}
}
}
fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) {
let state = Rc::downgrade(&self.state);
let worker2 = worker.clone();
let reclaim_slot = Rc::new(RefCell::new(None));
let slot2 = reclaim_slot.clone();
let mut on_finish = Some(on_finish);
let reclaim = Closure::wrap(Box::new(move |event: Event| {
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
console_log!("error in worker: {}", error.message());
return;
}
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
on_finish.take().unwrap()();
if let Some(state) = state.upgrade() {
state.push(worker2.clone());
}
*slot2.borrow_mut() = None;
return;
}
console_log!("unhandled event: {}", event.type_());
logv(&event);
}) as Box<FnMut(Event)>);
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
*reclaim_slot.borrow_mut() = Some(reclaim);
}
fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
let worker = self.execute(f)?;
self.reclaim_on_message(worker, || {});
Ok(())
}
fn run_notify<T>(
&self,
f: impl FnOnce() -> T + Send + 'static,
) -> Result<impl Future<Item = T, Error = JsValue> + 'static, JsValue>
where
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let storage = Arc::new(AtomicValue::new(None));
let storage2 = storage.clone();
let worker = self.execute(move || {
assert!(storage2.replace(Some(f())).is_ok());
})?;
self.reclaim_on_message(worker, move || match storage.replace(None) {
Ok(Some(val)) => drop(tx.send(val)),
_ => unreachable!(),
});
Ok(rx.map_err(|_| JsValue::undefined()))
}
}
struct AtomicValue<T> {
modifying: AtomicBool,
slot: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for AtomicValue<T> {}
unsafe impl<T: Send> Sync for AtomicValue<T> {}
impl<T> AtomicValue<T> {
fn new(val: T) -> AtomicValue<T> {
AtomicValue {
modifying: AtomicBool::new(false),
slot: UnsafeCell::new(val),
}
}
fn replace(&self, val: T) -> Result<T, T> {
if self.modifying.swap(true, SeqCst) {
return Err(val);
}
let ret = unsafe { mem::replace(&mut *self.slot.get(), val) };
self.modifying.store(false, SeqCst);
Ok(ret)
}
}
impl PoolState {
fn push(&self, worker: Worker) {
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
let mut workers = self.workers.borrow_mut();
for prev in workers.iter() {
let prev: &JsValue = prev;
let worker: &JsValue = &worker;
assert!(prev != worker);
}
workers.push(worker);
}
}
trait FnBox {
fn call_box(self: Box<Self>);
}
impl<T: FnOnce()> FnBox for T {
fn call_box(self: Box<Self>) {
(*self)();
}
}
// impl Drop for WorkerPool {
// fn drop(&mut self) {
// for worker in self.workers.iter() {
// worker.worker.terminate();
// }
// }
// }
#[wasm_bindgen]
pub struct RenderingScene {
base: usize,
len: usize,
promise: Promise,
width: u32,
height: u32,
}
#[wasm_bindgen]
extern "C" {
pub type ImageData;
#[wasm_bindgen(constructor, catch)]
fn new(data: &Uint8ClampedArray, width: f64, height: f64) -> Result<ImageData, JsValue>;
}
#[wasm_bindgen]
impl RenderingScene {
pub fn promise(&self) -> Promise {
self.promise.clone()
}
/// Return a progressive rendering of the image so far
#[wasm_bindgen(js_name = imageSoFar)]
pub fn image_so_far(&self) -> ImageData {
image_data(self.base, self.len, self.width, self.height)
}
}
fn image_data(base: usize, len: usize, width: u32, height: u32) -> ImageData {
// Use the raw access available through `memory.buffer`, but be sure to
// use `slice` instead of `subarray` to create a copy that isn't backed
// by `SharedArrayBuffer`. Currently `ImageData` rejects a view of
// `Uint8ClampedArray` that's backed by a shared buffer.
//
// Note that this may or may not be UB based on Rust's rules. For now
// though it seems to work and produces a nifty progressive rendering!
let mem = wasm_bindgen::memory().unchecked_into::<WebAssembly::Memory>();
let mem = Uint8ClampedArray::new(&mem.buffer()).slice(base as u32, (base + len) as u32);
ImageData::new(&mem, width as f64, height as f64).unwrap()
}
struct Render {
callback: Option<Closure<dyn FnMut(Event) -> Result<(), JsValue>>>,
tx: Option<oneshot::Sender<()>>,
shared: Arc<Shared>,
ctx: CanvasRenderingContext2d,
}
struct Shared {
id: usize,
need_update: AtomicBool,
scene: raytracer::scene::Scene,
next_pixel: AtomicUsize,
remaining: AtomicUsize,
rgb_data: Mutex<Vec<u8>>,
}
#[wasm_bindgen]
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
ptr.func.call_box();
global.post_message(&JsValue::undefined())?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment