Last active
February 20, 2020 21:00
-
-
Save ChunMinChang/8d13946ebc6c95b2622466c89a0c9bcc to your computer and use it in GitHub Desktop.
Rust wrappers for OSX dispatch_async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod sys; | |
use std::ffi::CString; | |
use std::os::raw::c_void; | |
use std::{thread, time::Duration}; | |
// A macro to print the function name | |
macro_rules! function { | |
() => {{ | |
fn f() {} | |
fn type_name_of<T>(_: T) -> &'static str { | |
std::any::type_name::<T>() | |
} | |
let name = type_name_of(f); | |
&name[..name.len() - 3] | |
}}; | |
} | |
fn run_tasks_in_order_sync() { | |
println!("{}", function!()); | |
let label = "Run tasks in order"; | |
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL); | |
#[derive(Debug)] | |
struct Resource { | |
last_touched: Option<u32>, | |
touched_count: u32, | |
} | |
impl Resource { | |
fn new() -> Self { | |
Resource { | |
last_touched: None, | |
touched_count: 0, | |
} | |
} | |
} | |
let mut resource = Resource::new(); | |
println!("resource @ {:p}", &resource); | |
// Rust compilter doesn't allow a pointer to be passed across threads. | |
// A hacky way to do that is to cast the pointer into a value, then | |
// the value, which is actually an address, can be copied into threads. | |
let resource_ptr = &mut resource as *mut Resource as usize; | |
// The program will wait here until finishing running the closure below. | |
dispatch_sync(queue, move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert_eq!(res.last_touched, None); | |
assert_eq!(res.touched_count, 0); | |
res.last_touched = Some(1); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1; | |
}); | |
// The program will wait here until finishing running the closure below. | |
dispatch_sync(queue, move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert!(res.last_touched.is_some()); | |
assert_eq!(res.last_touched.unwrap(), 1); | |
assert_eq!(res.touched_count, 1); | |
res.last_touched = Some(2); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1; | |
}); | |
println!("{:?} @ {:p}", resource, &resource); | |
assert!(resource.last_touched.is_some()); | |
assert_eq!(resource.last_touched.unwrap(), 2); | |
// This will free the `queue` asynchronously. | |
release_dispatch_queue(queue); | |
} | |
fn run_tasks_in_order_async() { | |
println!("{}", function!()); | |
let label = "Run tasks in order"; | |
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL); | |
#[derive(Debug)] | |
struct Resource { | |
last_touched: Option<u32>, | |
touched_count: u32, | |
} | |
impl Resource { | |
fn new() -> Self { | |
Resource { | |
last_touched: None, | |
touched_count: 0, | |
} | |
} | |
} | |
let mut resource = Resource::new(); | |
println!("resource @ {:p}", &resource); | |
// Rust compilter doesn't allow a pointer to be passed across threads. | |
// A hacky way to do that is to cast the pointer into a value, then | |
// the value, which is actually an address, can be copied into threads. | |
let resource_ptr = &mut resource as *mut Resource as usize; | |
dispatch_async(queue, move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert_eq!(res.last_touched, None); | |
assert_eq!(res.touched_count, 0); | |
res.last_touched = Some(1); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
// Make sure the `res.touched_count += 1` is the last instruction of | |
// the task since we use `res.touched_count` to check if whether | |
// we should release the `resource` and should finish the | |
// `run_tasks_in_order`. Any instructions after | |
// `res.touched_count += 1` may be executed after `run_tasks_in_order`. | |
res.touched_count += 1; | |
}); | |
dispatch_async(queue, move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert!(res.last_touched.is_some()); | |
assert_eq!(res.last_touched.unwrap(), 1); | |
assert_eq!(res.touched_count, 1); | |
res.last_touched = Some(2); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
// Make sure the `res.touched_count += 1` is the last instruction of | |
// the task since we use `res.touched_count` to check if whether | |
// we should release the `resource` and should finish the | |
// `run_tasks_in_order`. Any instructions after | |
// `res.touched_count += 1` may be executed after `run_tasks_in_order`. | |
res.touched_count += 1; | |
// The following logs may cause error like illegal instruction or crash | |
// since they may be executed after `resource` is freed! | |
// println!("crash > {:?} @ {:p}", res, res); | |
// println!("crash > res @ 0x{:p} was touched {} times, last touch is {}", | |
// res, res.touched_count, res.last_touched.unwrap()); | |
}); | |
// Make sure the resource won't be freed before the tasks are finished. | |
while resource.touched_count < 2 {} | |
println!("{:?} @ {:p}", resource, &resource); | |
assert!(resource.last_touched.is_some()); | |
assert_eq!(resource.last_touched.unwrap(), 2); | |
// This will free the `queue` asynchronously. | |
release_dispatch_queue(queue); | |
} | |
fn create_dispatch_queue( | |
label: &'static str, | |
queue_attr: sys::dispatch_queue_attr_t, | |
) -> sys::dispatch_queue_t { | |
let label = CString::new(label).unwrap(); | |
let c_string = label.as_ptr(); | |
unsafe { sys::dispatch_queue_create(c_string, queue_attr) } | |
} | |
fn release_dispatch_queue(queue: sys::dispatch_queue_t) { | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_async_f(queue, closure, executor); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_sync_f(queue, closure, executor); | |
} | |
} | |
// TODO: Find a way to optimize it! | |
// Return a raw pointer to a (unboxed) closure and an executor that | |
// will run the closure (after re-boxing the closure) when it's called. | |
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t) | |
where | |
F: FnOnce(), | |
{ | |
// In machine code level, the function defined within function is same | |
// as the function defined outside. They're just normal functions that | |
// are not accessible from outside the function. | |
// References: | |
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696 | |
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust | |
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void) | |
where | |
F: FnOnce(), | |
{ | |
// Retake the leaked closure. | |
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) }; | |
// Execute the closure. | |
(*closure)(); | |
// closure is released after finishiing this function call. | |
} | |
// The rust closure is a struct. We need to convert it into a function | |
// pointer so it can be invoked on another thread. | |
let closure: Box<F> = Box::new(closure); // Allocate closure on heap. | |
// `closure_executer::<F>` is a function pointer pointing to the address of | |
// `closure_executer<F>` defined above. | |
let executor: sys::dispatch_function_t = Some(closure_executer::<F>); | |
( | |
Box::into_raw(closure) as *mut c_void, // Leak the closure. | |
executor, | |
) | |
} | |
fn sleep(millis: u64) { | |
let duration = Duration::from_millis(millis); | |
thread::sleep(duration); | |
} | |
fn main() { | |
run_tasks_in_order_sync(); | |
run_tasks_in_order_async(); | |
// Wait 10 milliseconds to show the results. | |
sleep(10); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod sys; | |
use std::ffi::CString; | |
use std::os::raw::c_void; | |
use std::ptr; | |
use std::{thread, time}; | |
fn run_with_native_async_apis() { | |
let label = CString::new("Run with native async dispatch apis"); | |
let c_string = if label.is_ok() { | |
label.unwrap().as_ptr() | |
} else { | |
ptr::null() | |
}; | |
let queue = unsafe { sys::dispatch_queue_create(c_string, sys::DISPATCH_QUEUE_SERIAL) }; | |
let mut context: i32 = 10; | |
println!("[async] context: {} @ {:p}", context, &context); | |
// In machine code level, the function defined within function is same | |
// as the function defined outside. They're just normal functions that | |
// are not accessible from outside the function. | |
// References: | |
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696 | |
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust | |
extern "C" fn work(context_ptr: *mut c_void) { | |
let ctx_mut_ref = unsafe { &mut (*(context_ptr as *mut i32)) }; | |
println!( | |
"[async] (work) context: {} @ {:p}", | |
*ctx_mut_ref, ctx_mut_ref | |
); | |
assert_eq!(ctx_mut_ref, &10); | |
*ctx_mut_ref += 1; | |
} | |
unsafe { | |
sys::dispatch_async_f( | |
queue, | |
&mut context as *mut i32 as *mut c_void, | |
// `work` is a function pointer pointing to the address of | |
// `work` defined above. | |
Some(work), | |
); | |
} | |
while context == 10 { | |
println!("[async] wait for async task..."); | |
// Wait 10 milliseconds to show the results for async APIs. | |
sleep(10); | |
} | |
println!("[async] context: {} @ {:p}", context, &context); | |
assert_eq!(context, 11); | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
fn run_with_native_sync_apis() { | |
let label = CString::new("Run with native sync dispatch apis"); | |
let c_string = if label.is_ok() { | |
label.unwrap().as_ptr() | |
} else { | |
ptr::null() | |
}; | |
let queue = unsafe { sys::dispatch_queue_create(c_string, sys::DISPATCH_QUEUE_SERIAL) }; | |
let mut context: i32 = 20; | |
println!("[sync] context: {} @ {:p}", context, &context); | |
// In machine code level, the function defined within function is same | |
// as the function defined outside. They're just normal functions that | |
// are not accessible from outside the function. | |
// References: | |
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696 | |
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust | |
extern "C" fn work(context_ptr: *mut c_void) { | |
let ctx_mut_ref = unsafe { &mut (*(context_ptr as *mut i32)) }; | |
println!( | |
"[sync] (work) context: {} @ {:p}", | |
*ctx_mut_ref, ctx_mut_ref | |
); | |
assert_eq!(ctx_mut_ref, &20); | |
*ctx_mut_ref = 30; | |
} | |
// It's ok to pass the pointer of `context`(which is on stack) here. | |
// Since the calling is sync, so the the program will wait here until | |
// the work is completed. That is, the `context` is still on the stack | |
// during work is running. | |
unsafe { | |
sys::dispatch_sync_f( | |
queue, | |
&mut context as *mut i32 as *mut c_void, | |
// `work` is a function pointer pointing to the address of | |
// `work` defined above. | |
Some(work), | |
); | |
} | |
println!("[sync] context: {} @ {:p}", context, &context); | |
assert_eq!(context, 30); | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
// TODO: It's better to create a queue object so we can make sure the task | |
// is dispatched to the queue we want. | |
// mod queue { | |
// struct Queue { | |
// inner: dispatch_queue_t | |
// } | |
// impl Queue { | |
// } | |
// } | |
fn sleep(millis: u64) { | |
let duration = time::Duration::from_millis(millis); | |
thread::sleep(duration); | |
} | |
fn main() { | |
run_with_native_sync_apis(); | |
run_with_native_async_apis(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod sys; | |
use std::ffi::CString; | |
use std::os::raw::c_void; | |
use std::{thread, time}; | |
struct TaskContext {} | |
impl TaskContext { | |
fn new() -> Self { | |
TaskContext {} | |
} | |
fn run(&self) { | |
println!("run some task!"); | |
} | |
} | |
fn run_task_by_rendering_sync_callback() { | |
let label = "Run tasks by rendering callback"; | |
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL); | |
dispatch_task_sync(queue, TaskContext::new()); | |
release_dispatch_queue(queue); | |
} | |
fn run_task_by_rendering_async_callback() { | |
let label = "Run tasks by rendering callback"; | |
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL); | |
dispatch_task_async(queue, TaskContext::new()); | |
// TODO: the queue is leaked here! | |
} | |
fn create_dispatch_queue( | |
label: &'static str, | |
queue_attr: sys::dispatch_queue_attr_t, | |
) -> sys::dispatch_queue_t { | |
let label = CString::new(label).unwrap(); | |
let c_string = label.as_ptr(); | |
unsafe { sys::dispatch_queue_create(c_string, queue_attr) } | |
} | |
fn release_dispatch_queue(queue: sys::dispatch_queue_t) { | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
fn dispatch_task_async(queue: sys::dispatch_queue_t, context: TaskContext) { | |
// Allocate `task_context` on heap. | |
let task_context = Box::new(context); | |
unsafe { | |
sys::dispatch_async_f( | |
queue, | |
Box::into_raw(task_context) as *mut c_void, // Leak the `task_context`. | |
Some(dispatch_callback), | |
); | |
} | |
} | |
fn dispatch_task_sync(queue: sys::dispatch_queue_t, context: TaskContext) { | |
// Allocate `task_context` on heap. | |
let task_context = Box::new(context); | |
unsafe { | |
sys::dispatch_sync_f( | |
queue, | |
Box::into_raw(task_context) as *mut c_void, // Leak the `task_context`. | |
Some(dispatch_callback), | |
); | |
} | |
} | |
extern "C" fn dispatch_callback(context: *mut c_void) { | |
// Retake the leaked `task_context`. | |
let task_context = unsafe { Box::from_raw(context as *mut TaskContext) }; | |
task_context.run(); | |
// `task_context` is released after finishing `dispatch_callback`. | |
} | |
// TODO: It's better to create a queue object so we can make sure the task | |
// is dispatched to the queue we want. | |
// mod queue { | |
// struct Queue { | |
// inner: dispatch_queue_t | |
// } | |
// impl Queue { | |
// } | |
// } | |
fn sleep(millis: u64) { | |
let duration = time::Duration::from_millis(millis); | |
thread::sleep(duration); | |
} | |
fn main() { | |
run_task_by_rendering_sync_callback(); | |
run_task_by_rendering_async_callback(); | |
// Wait 10 milliseconds to show the results for async APIs. | |
sleep(10); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod sys; | |
use std::ffi::CString; | |
use std::os::raw::c_void; | |
use std::{thread, time::Duration}; | |
fn run_tasks_in_order() { | |
#[derive(Debug)] | |
struct Resource { | |
last_touched: Option<u32>, | |
touched_count: u32, | |
} | |
impl Resource { | |
fn new() -> Self { | |
Resource { | |
last_touched: None, | |
touched_count: 0, | |
} | |
} | |
} | |
let mut resource = Resource::new(); | |
println!("resource @ {:p}", &resource); | |
// Rust compilter doesn't allow a pointer to be passed across threads. | |
// A hacky way to do that is to cast the pointer into a value, then | |
// the value, which is actually an address, can be copied into threads. | |
let resource_ptr = &mut resource as *mut Resource as usize; | |
let queue = Queue::new("Test"); | |
queue.run_async(move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert_eq!(res.last_touched, None); | |
assert_eq!(res.touched_count, 0); | |
res.last_touched = Some(1); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1 | |
}); | |
queue.run_sync(move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert!(res.last_touched.is_some()); | |
assert_eq!(res.last_touched.unwrap(), 1); | |
assert_eq!(res.touched_count, 1); | |
res.last_touched = Some(2); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1; | |
}); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
} | |
// Queue: A wrapper around `dispatch_queue_t`. | |
// ------------------------------------------------------------------------------------------------ | |
struct Queue(sys::dispatch_queue_t); | |
impl Queue { | |
fn new(label: &'static str) -> Self { | |
Self(create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL)) | |
} | |
fn run_async<F>(&self, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
dispatch_async(self.0, work); | |
} | |
fn run_sync<F>(&self, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
dispatch_sync(self.0, work); | |
} | |
// This will release the inner `dispatch_queue_t` asynchronously. | |
fn release(&self) { | |
release_dispatch_queue(self.0); | |
} | |
} | |
impl Drop for Queue { | |
fn drop(&mut self) { | |
self.release(); | |
println!("Queue is drop!"); | |
} | |
} | |
// Low-level Grand Central Dispatch (GCD) APIs | |
// https://developer.apple.com/library/archive/documentation/General/Conceptual/ConcurrencyProgrammingGuide/OperationQueues/OperationQueues.html | |
// ------------------------------------------------------------------------------------------------ | |
fn create_dispatch_queue( | |
label: &'static str, | |
queue_attr: sys::dispatch_queue_attr_t, | |
) -> sys::dispatch_queue_t { | |
let label = CString::new(label).unwrap(); | |
let c_string = label.as_ptr(); | |
unsafe { sys::dispatch_queue_create(c_string, queue_attr) } | |
} | |
// This will release the `queue` asynchronously. | |
fn release_dispatch_queue(queue: sys::dispatch_queue_t) { | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_async_f(queue, closure, executor); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_sync_f(queue, closure, executor); | |
} | |
} | |
// TODO: Find a way to optimize it! | |
// Return a raw pointer to a (unboxed) closure and an executor that | |
// will run the closure (after re-boxing the closure) when it's called. | |
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t) | |
where | |
F: FnOnce(), | |
{ | |
// In machine code level, the function defined within function is same | |
// as the function defined outside. They're just normal functions that | |
// are not accessible from outside the function. | |
// References: | |
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696 | |
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust | |
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void) | |
where | |
F: FnOnce(), | |
{ | |
// Retake the leaked closure. | |
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) }; | |
// Execute the closure. | |
(*closure)(); | |
// closure is released after finishiing this function call. | |
} | |
// The rust closure is a struct. We need to convert it into a function | |
// pointer so it can be invoked on another thread. | |
let closure: Box<F> = Box::new(closure); // Allocate closure on heap. | |
// `closure_executer::<F>` is a function pointer pointing to the address of | |
// `closure_executer<F>` defined above. | |
let executor: sys::dispatch_function_t = Some(closure_executer::<F>); | |
( | |
Box::into_raw(closure) as *mut c_void, // Leak the closure. | |
executor, | |
) | |
} | |
fn sleep(millis: u64) { | |
let duration = Duration::from_millis(millis); | |
thread::sleep(duration); | |
} | |
fn main() { | |
run_tasks_in_order(); | |
// Wait 10 milliseconds to show the results. | |
sleep(10); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
all: | |
rustfmt *.rs | |
rustc queue.rs | |
rustc dispatcher.rs | |
rustc dispatch_native_apis.rs | |
rustc dispatch_api.rs | |
rustc dispatch_render_callback.rs | |
clean: | |
rm queue | |
rm dispatcher | |
rm dispatch_native_apis | |
rm dispatch_api | |
rm dispatch_render_callback |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mod sys; | |
use std::ffi::CString; | |
use std::fmt::Debug; | |
use std::os::raw::c_void; | |
use std::sync::atomic::{AtomicBool, Ordering}; | |
use std::sync::Arc; | |
use std::{thread, time::Duration}; | |
fn run_tasks_in_order() { | |
#[derive(Debug)] | |
struct Resource { | |
last_touched: Option<u32>, | |
touched_count: u32, | |
} | |
impl Resource { | |
fn new() -> Self { | |
Resource { | |
last_touched: None, | |
touched_count: 0, | |
} | |
} | |
} | |
let mut resource = Resource::new(); | |
println!("resource @ {:p}", &resource); | |
// Rust compilter doesn't allow a pointer to be passed across threads. | |
// A hacky way to do that is to cast the pointer into a value, then | |
// the value, which is actually an address, can be copied into threads. | |
let resource_ptr = &mut resource as *mut Resource as usize; | |
let queue = Queue::new("Test"); | |
queue.run_async(move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert_eq!(res.last_touched, None); | |
assert_eq!(res.touched_count, 0); | |
res.last_touched = Some(1); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1 | |
}); | |
queue.run_sync(move || { | |
let res: &mut Resource = unsafe { | |
let ptr = resource_ptr as *mut Resource; | |
&mut (*ptr) | |
}; | |
assert_eq!(res as *mut Resource as usize, resource_ptr); | |
assert!(res.last_touched.is_some()); | |
assert_eq!(res.last_touched.unwrap(), 1); | |
assert_eq!(res.touched_count, 1); | |
res.last_touched = Some(2); | |
println!( | |
"res @ 0x{:p} was touched by {}", | |
res, | |
res.last_touched.unwrap() | |
); | |
res.touched_count += 1; | |
}); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
queue.run_async(|| println!("Run after queue is drop?")); | |
} | |
// Queue: A wrapper around `dispatch_queue_t`. | |
// ------------------------------------------------------------------------------------------------ | |
struct Queue(sys::dispatch_queue_t); | |
impl Queue { | |
// Public methods | |
fn new(label: &'static str) -> Self { | |
let q = Self(create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL)); | |
let is_destroying = Box::new(AtomicBool::new(false)); | |
q.set_context(is_destroying); | |
q | |
} | |
fn run_async<F>(&self, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let is_destroying = self.get_context::<AtomicBool>(); | |
dispatch_async(self.0, move || { | |
if is_destroying.load(Ordering::SeqCst) { | |
println!("Queue is dropped. Cancel the task."); | |
return; | |
} | |
work(); | |
}); | |
} | |
fn run_sync<F>(&self, work: F) | |
where | |
F: 'static + Send + FnOnce(), | |
{ | |
let is_destroying = self.get_context::<AtomicBool>(); | |
dispatch_sync(self.0, || { | |
if is_destroying.load(Ordering::SeqCst) { | |
println!("Queue is dropped. Cancel the task."); | |
return; | |
} | |
work(); | |
}); | |
} | |
// Private methods | |
fn get_context<T>(&self) -> &mut T { | |
unsafe { &mut *(get_dispatch_context(self.0) as *mut T) } | |
} | |
fn set_context<T>(&self, context: Box<T>) { | |
set_dispatch_context(self.0, Box::into_raw(context) as *mut c_void); | |
extern "C" fn finalizer<T>(context: *mut c_void) { | |
// Retake the leaked context and drop it. | |
let _ = unsafe { Box::from_raw(context as *mut T) }; | |
} | |
set_dispatch_finalizer_f(self.0, Some(finalizer::<T>)); | |
} | |
// This will release the inner `dispatch_queue_t` asynchronously. | |
fn release(&self) { | |
self.get_context::<AtomicBool>() | |
.store(true, Ordering::SeqCst); | |
// The tasks appended after the above call will be cancelled. | |
release_dispatch_queue(self.0); | |
} | |
} | |
impl Drop for Queue { | |
fn drop(&mut self) { | |
self.release(); | |
println!("Queue is drop!"); | |
} | |
} | |
// Low-level Grand Central Dispatch (GCD) APIs | |
// https://developer.apple.com/library/archive/documentation/General/Conceptual/ConcurrencyProgrammingGuide/OperationQueues/OperationQueues.html | |
// ------------------------------------------------------------------------------------------------ | |
fn create_dispatch_queue( | |
label: &'static str, | |
queue_attr: sys::dispatch_queue_attr_t, | |
) -> sys::dispatch_queue_t { | |
let label = CString::new(label).unwrap(); | |
let c_string = label.as_ptr(); | |
unsafe { sys::dispatch_queue_create(c_string, queue_attr) } | |
} | |
// This will release the `queue` asynchronously. | |
fn release_dispatch_queue(queue: sys::dispatch_queue_t) { | |
unsafe { | |
sys::dispatch_release(queue.into()); | |
} | |
} | |
fn get_dispatch_context(queue: sys::dispatch_queue_t) -> *mut c_void { | |
unsafe { sys::dispatch_get_context(queue.into()) } | |
} | |
fn set_dispatch_context(queue: sys::dispatch_queue_t, context: *mut c_void) { | |
unsafe { | |
sys::dispatch_set_context(queue.into(), context); | |
} | |
} | |
// The `finalizer` is only run if the `context` in queue is set (by `sys::dispatch_set_context`). | |
fn set_dispatch_finalizer_f(queue: sys::dispatch_queue_t, finalizer: sys::dispatch_function_t) { | |
unsafe { | |
sys::dispatch_set_finalizer_f(queue.into(), finalizer); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_async_f(queue, closure, executor); | |
} | |
} | |
// Send: Types that can be transferred across thread boundaries. | |
// FnOnce: One-time function. | |
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F) | |
where | |
F: Send + FnOnce(), | |
{ | |
let (closure, executor) = create_closure_and_executor(work); | |
unsafe { | |
sys::dispatch_sync_f(queue, closure, executor); | |
} | |
} | |
// TODO: Find a way to optimize it! | |
// Return a raw pointer to a (unboxed) closure and an executor that | |
// will run the closure (after re-boxing the closure) when it's called. | |
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t) | |
where | |
F: FnOnce(), | |
{ | |
// In machine code level, the function defined within function is same | |
// as the function defined outside. They're just normal functions that | |
// are not accessible from outside the function. | |
// References: | |
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696 | |
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust | |
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void) | |
where | |
F: FnOnce(), | |
{ | |
// Retake the leaked closure. | |
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) }; | |
// Execute the closure. | |
(*closure)(); | |
// closure is released after finishiing this function call. | |
} | |
// The rust closure is a struct. We need to convert it into a function | |
// pointer so it can be invoked on another thread. | |
let closure: Box<F> = Box::new(closure); // Allocate closure on heap. | |
// `closure_executer::<F>` is a function pointer pointing to the address of | |
// `closure_executer<F>` defined above. | |
let executor: sys::dispatch_function_t = Some(closure_executer::<F>); | |
( | |
Box::into_raw(closure) as *mut c_void, // Leak the closure. | |
executor, | |
) | |
} | |
fn sleep(millis: u64) { | |
let duration = Duration::from_millis(millis); | |
thread::sleep(duration); | |
} | |
fn main() { | |
run_tasks_in_order(); | |
// Wait 10 milliseconds to show the results. | |
sleep(10); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// References: | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L85 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L207 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L235 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L259 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L277 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L303 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L139 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L205 | |
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L819 | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/index.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_function_t.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_queue_attr_t.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_queue_t.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_queue_create.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_release.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_retain.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_async_f.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_sync_f.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_get_context.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_set_context.html | |
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_set_finalizer_f.html | |
// https://github.com/SSheldon/rust-dispatch/blob/master/src/ffi.rs | |
#![allow(non_camel_case_types)] | |
use std::os::raw::{c_char, c_void}; | |
pub enum dispatch_queue_s {} | |
pub enum dispatch_queue_attr_s {} | |
#[repr(C)] | |
pub union dispatch_object_s { | |
_dq: *mut dispatch_queue_s, | |
} | |
impl From<*mut dispatch_queue_s> for dispatch_object_s { | |
fn from(dq: *mut dispatch_queue_s) -> Self { | |
Self { _dq: dq } | |
} | |
} | |
pub type dispatch_function_t = Option<extern "C" fn(*mut c_void)>; | |
pub type dispatch_object_t = dispatch_object_s; | |
pub type dispatch_queue_attr_t = *mut dispatch_queue_attr_s; | |
pub type dispatch_queue_t = *mut dispatch_queue_s; | |
pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t; | |
#[link(name = "System", kind = "dylib")] // Link to a dynamic library in Dispatch framework. | |
extern "C" { | |
pub fn dispatch_queue_create( | |
label: *const c_char, | |
attr: dispatch_queue_attr_t, | |
) -> dispatch_queue_t; | |
pub fn dispatch_release(object: dispatch_object_t); | |
// `dispatch_block_t` is a lambda expression-like syntax to create closures invented by Apple. | |
// Not sure if Rust supports it. `dispatch_async_f` and `dispatch_sync_f` should be able to | |
// replace `dispatch_async` and `dispatch_sync`. | |
pub fn dispatch_async_f( | |
queue: dispatch_queue_t, | |
context: *mut c_void, | |
work: dispatch_function_t, | |
); | |
pub fn dispatch_sync_f( | |
queue: dispatch_queue_t, | |
context: *mut c_void, | |
work: dispatch_function_t, | |
); | |
pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void; | |
pub fn dispatch_set_context(object: dispatch_object_t, context: *mut c_void); | |
pub fn dispatch_set_finalizer_f(object: dispatch_object_t, finalizer: dispatch_function_t); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment