Created
March 7, 2019 16:45
-
-
Save alexcrichton/2e793f8fbd99c438467828811149ce26 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use futures::sync::oneshot; | |
use futures::Future; | |
use js_sys::{Array, Error, Promise, Uint8ClampedArray, WebAssembly}; | |
use rayon::prelude::*; | |
use std::cell::{RefCell, UnsafeCell}; | |
use std::cmp; | |
use std::mem; | |
use std::rc::{Rc, Weak}; | |
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}; | |
use std::sync::{Arc, Mutex, MutexGuard}; | |
use wasm_bindgen::prelude::*; | |
use wasm_bindgen::JsCast; | |
use web_sys::{CanvasRenderingContext2d, ErrorEvent, Event, Worker}; | |
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent}; | |
macro_rules! console_log { | |
($($t:tt)*) => (log(&format_args!($($t)*).to_string())) | |
} | |
#[wasm_bindgen] | |
extern "C" { | |
#[wasm_bindgen(js_namespace = console)] | |
fn log(s: &str); | |
#[wasm_bindgen(js_namespace = console, js_name = log)] | |
fn logv(x: &JsValue); | |
} | |
#[wasm_bindgen] | |
pub struct Scene { | |
inner: raytracer::scene::Scene, | |
} | |
static NEXT_ID: AtomicUsize = AtomicUsize::new(0); | |
#[wasm_bindgen] | |
impl Scene { | |
#[wasm_bindgen(constructor)] | |
pub fn new(object: &JsValue) -> Result<Scene, JsValue> { | |
console_error_panic_hook::set_once(); | |
Ok(Scene { | |
inner: object | |
.into_serde() | |
.map_err(|e| JsValue::from(e.to_string()))?, | |
}) | |
} | |
pub fn render(self, concurrency: usize, pool: &WorkerPool) -> Result<RenderingScene, JsValue> { | |
let scene = self.inner; | |
let height = scene.height; | |
let width = scene.width; | |
let pixels = (width * height) as usize; | |
let mut rgb_data = vec![0; 4 * pixels]; | |
let base = rgb_data.as_ptr() as usize; | |
let len = rgb_data.len(); | |
let thread_pool = rayon::ThreadPoolBuilder::new() | |
.num_threads(concurrency - 1) | |
.spawn(|thread| Ok(pool.run(|| thread.run()).unwrap())) | |
.unwrap(); | |
let done = pool.run_notify(move || { | |
thread_pool.install(|| { | |
rgb_data | |
.par_chunks_mut(4) | |
.enumerate() | |
.for_each(|(i, chunk)| { | |
let i = i as u32; | |
let x = i % width; | |
let y = i / width; | |
let ray = raytracer::Ray::create_prime(x, y, &scene); | |
let result = raytracer::cast_ray(&scene, &ray, 0).to_rgba(); | |
chunk[0] = result.data[0]; | |
chunk[1] = result.data[1]; | |
chunk[2] = result.data[2]; | |
chunk[3] = result.data[3]; | |
}); | |
}); | |
rgb_data | |
})?; | |
let done = done.map(move |_data| image_data(base, len, width, height).into()); | |
Ok(RenderingScene { | |
promise: wasm_bindgen_futures::future_to_promise(done), | |
base, | |
len, | |
height, | |
width, | |
}) | |
} | |
} | |
#[wasm_bindgen] | |
pub struct WorkerPool { | |
state: Rc<PoolState>, | |
} | |
struct PoolState { | |
workers: RefCell<Vec<Worker>>, | |
callback: Closure<dyn FnMut(Event)>, | |
} | |
struct Work { | |
func: Box<FnBox + Send>, | |
} | |
#[wasm_bindgen] | |
impl WorkerPool { | |
#[wasm_bindgen(constructor)] | |
pub fn new(initial: usize) -> Result<WorkerPool, JsValue> { | |
let pool = WorkerPool { | |
state: Rc::new(PoolState { | |
workers: RefCell::new(Vec::with_capacity(initial)), | |
callback: Closure::wrap(Box::new(|event: Event| { | |
console_log!("unhandled event: {}", event.type_()); | |
logv(&event); | |
}) as Box<FnMut(Event)>), | |
}), | |
}; | |
for _ in 0..initial { | |
let worker = pool.spawn()?; | |
pool.state.push(worker); | |
} | |
Ok(pool) | |
} | |
fn spawn(&self) -> Result<Worker, JsValue> { | |
console_log!("spawning new worker"); | |
// TODO: what do do about `./worker.js`: | |
// | |
// * the path is only known by the bundler. How can we, as a | |
// library, know what's going on? | |
// * How do we not fetch a script N times? It internally then | |
// causes another script to get fetched N times... | |
let worker = Worker::new("./worker.js")?; | |
// With a worker spun up send it the module/memory so it can start | |
// instantiating the wasm module. Later it might receive further | |
// messages about code to run on the wasm module. | |
let array = js_sys::Array::new(); | |
array.push(&wasm_bindgen::module()); | |
array.push(&wasm_bindgen::memory()); | |
worker.post_message(&array)?; | |
Ok(worker) | |
} | |
fn worker(&self) -> Result<Worker, JsValue> { | |
match self.state.workers.borrow_mut().pop() { | |
Some(worker) => Ok(worker), | |
None => self.spawn(), | |
} | |
} | |
fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result<Worker, JsValue> { | |
let worker = self.worker()?; | |
let work = Box::new(Work { func: Box::new(f) }); | |
let ptr = Box::into_raw(work); | |
match worker.post_message(&JsValue::from(ptr as u32)) { | |
Ok(()) => Ok(worker), | |
Err(e) => { | |
unsafe { | |
drop(Box::from_raw(ptr)); | |
} | |
Err(e) | |
} | |
} | |
} | |
fn reclaim_on_message(&self, worker: Worker, on_finish: impl FnOnce() + 'static) { | |
let state = Rc::downgrade(&self.state); | |
let worker2 = worker.clone(); | |
let reclaim_slot = Rc::new(RefCell::new(None)); | |
let slot2 = reclaim_slot.clone(); | |
let mut on_finish = Some(on_finish); | |
let reclaim = Closure::wrap(Box::new(move |event: Event| { | |
if let Some(error) = event.dyn_ref::<ErrorEvent>() { | |
console_log!("error in worker: {}", error.message()); | |
return; | |
} | |
if let Some(_msg) = event.dyn_ref::<MessageEvent>() { | |
on_finish.take().unwrap()(); | |
if let Some(state) = state.upgrade() { | |
state.push(worker2.clone()); | |
} | |
*slot2.borrow_mut() = None; | |
return; | |
} | |
console_log!("unhandled event: {}", event.type_()); | |
logv(&event); | |
}) as Box<FnMut(Event)>); | |
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref())); | |
*reclaim_slot.borrow_mut() = Some(reclaim); | |
} | |
fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> { | |
let worker = self.execute(f)?; | |
self.reclaim_on_message(worker, || {}); | |
Ok(()) | |
} | |
fn run_notify<T>( | |
&self, | |
f: impl FnOnce() -> T + Send + 'static, | |
) -> Result<impl Future<Item = T, Error = JsValue> + 'static, JsValue> | |
where | |
T: Send + 'static, | |
{ | |
let (tx, rx) = oneshot::channel(); | |
let storage = Arc::new(AtomicValue::new(None)); | |
let storage2 = storage.clone(); | |
let worker = self.execute(move || { | |
assert!(storage2.replace(Some(f())).is_ok()); | |
})?; | |
self.reclaim_on_message(worker, move || match storage.replace(None) { | |
Ok(Some(val)) => drop(tx.send(val)), | |
_ => unreachable!(), | |
}); | |
Ok(rx.map_err(|_| JsValue::undefined())) | |
} | |
} | |
struct AtomicValue<T> { | |
modifying: AtomicBool, | |
slot: UnsafeCell<T>, | |
} | |
unsafe impl<T: Send> Send for AtomicValue<T> {} | |
unsafe impl<T: Send> Sync for AtomicValue<T> {} | |
impl<T> AtomicValue<T> { | |
fn new(val: T) -> AtomicValue<T> { | |
AtomicValue { | |
modifying: AtomicBool::new(false), | |
slot: UnsafeCell::new(val), | |
} | |
} | |
fn replace(&self, val: T) -> Result<T, T> { | |
if self.modifying.swap(true, SeqCst) { | |
return Err(val); | |
} | |
let ret = unsafe { mem::replace(&mut *self.slot.get(), val) }; | |
self.modifying.store(false, SeqCst); | |
Ok(ret) | |
} | |
} | |
impl PoolState { | |
fn push(&self, worker: Worker) { | |
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref())); | |
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref())); | |
let mut workers = self.workers.borrow_mut(); | |
for prev in workers.iter() { | |
let prev: &JsValue = prev; | |
let worker: &JsValue = &worker; | |
assert!(prev != worker); | |
} | |
workers.push(worker); | |
} | |
} | |
trait FnBox { | |
fn call_box(self: Box<Self>); | |
} | |
impl<T: FnOnce()> FnBox for T { | |
fn call_box(self: Box<Self>) { | |
(*self)(); | |
} | |
} | |
// impl Drop for WorkerPool { | |
// fn drop(&mut self) { | |
// for worker in self.workers.iter() { | |
// worker.worker.terminate(); | |
// } | |
// } | |
// } | |
#[wasm_bindgen] | |
pub struct RenderingScene { | |
base: usize, | |
len: usize, | |
promise: Promise, | |
width: u32, | |
height: u32, | |
} | |
#[wasm_bindgen] | |
extern "C" { | |
pub type ImageData; | |
#[wasm_bindgen(constructor, catch)] | |
fn new(data: &Uint8ClampedArray, width: f64, height: f64) -> Result<ImageData, JsValue>; | |
} | |
#[wasm_bindgen] | |
impl RenderingScene { | |
pub fn promise(&self) -> Promise { | |
self.promise.clone() | |
} | |
/// Return a progressive rendering of the image so far | |
#[wasm_bindgen(js_name = imageSoFar)] | |
pub fn image_so_far(&self) -> ImageData { | |
image_data(self.base, self.len, self.width, self.height) | |
} | |
} | |
fn image_data(base: usize, len: usize, width: u32, height: u32) -> ImageData { | |
// Use the raw access available through `memory.buffer`, but be sure to | |
// use `slice` instead of `subarray` to create a copy that isn't backed | |
// by `SharedArrayBuffer`. Currently `ImageData` rejects a view of | |
// `Uint8ClampedArray` that's backed by a shared buffer. | |
// | |
// Note that this may or may not be UB based on Rust's rules. For now | |
// though it seems to work and produces a nifty progressive rendering! | |
let mem = wasm_bindgen::memory().unchecked_into::<WebAssembly::Memory>(); | |
let mem = Uint8ClampedArray::new(&mem.buffer()).slice(base as u32, (base + len) as u32); | |
ImageData::new(&mem, width as f64, height as f64).unwrap() | |
} | |
struct Render { | |
callback: Option<Closure<dyn FnMut(Event) -> Result<(), JsValue>>>, | |
tx: Option<oneshot::Sender<()>>, | |
shared: Arc<Shared>, | |
ctx: CanvasRenderingContext2d, | |
} | |
struct Shared { | |
id: usize, | |
need_update: AtomicBool, | |
scene: raytracer::scene::Scene, | |
next_pixel: AtomicUsize, | |
remaining: AtomicUsize, | |
rgb_data: Mutex<Vec<u8>>, | |
} | |
#[wasm_bindgen] | |
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> { | |
let ptr = unsafe { Box::from_raw(ptr as *mut Work) }; | |
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>(); | |
ptr.func.call_box(); | |
global.post_message(&JsValue::undefined())?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment