Skip to content

Instantly share code, notes, and snippets.

@ChunMinChang
Last active February 20, 2020 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ChunMinChang/8d13946ebc6c95b2622466c89a0c9bcc to your computer and use it in GitHub Desktop.
Save ChunMinChang/8d13946ebc6c95b2622466c89a0c9bcc to your computer and use it in GitHub Desktop.
Rust wrappers for OSX dispatch_async
mod sys;
use std::ffi::CString;
use std::os::raw::c_void;
use std::{thread, time::Duration};
// A macro to print the function name
macro_rules! function {
() => {{
fn f() {}
fn type_name_of<T>(_: T) -> &'static str {
std::any::type_name::<T>()
}
let name = type_name_of(f);
&name[..name.len() - 3]
}};
}
fn run_tasks_in_order_sync() {
println!("{}", function!());
let label = "Run tasks in order";
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL);
#[derive(Debug)]
struct Resource {
last_touched: Option<u32>,
touched_count: u32,
}
impl Resource {
fn new() -> Self {
Resource {
last_touched: None,
touched_count: 0,
}
}
}
let mut resource = Resource::new();
println!("resource @ {:p}", &resource);
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let resource_ptr = &mut resource as *mut Resource as usize;
// The program will wait here until finishing running the closure below.
dispatch_sync(queue, move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert_eq!(res.last_touched, None);
assert_eq!(res.touched_count, 0);
res.last_touched = Some(1);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1;
});
// The program will wait here until finishing running the closure below.
dispatch_sync(queue, move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert!(res.last_touched.is_some());
assert_eq!(res.last_touched.unwrap(), 1);
assert_eq!(res.touched_count, 1);
res.last_touched = Some(2);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1;
});
println!("{:?} @ {:p}", resource, &resource);
assert!(resource.last_touched.is_some());
assert_eq!(resource.last_touched.unwrap(), 2);
// This will free the `queue` asynchronously.
release_dispatch_queue(queue);
}
fn run_tasks_in_order_async() {
println!("{}", function!());
let label = "Run tasks in order";
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL);
#[derive(Debug)]
struct Resource {
last_touched: Option<u32>,
touched_count: u32,
}
impl Resource {
fn new() -> Self {
Resource {
last_touched: None,
touched_count: 0,
}
}
}
let mut resource = Resource::new();
println!("resource @ {:p}", &resource);
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let resource_ptr = &mut resource as *mut Resource as usize;
dispatch_async(queue, move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert_eq!(res.last_touched, None);
assert_eq!(res.touched_count, 0);
res.last_touched = Some(1);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
// Make sure the `res.touched_count += 1` is the last instruction of
// the task since we use `res.touched_count` to check if whether
// we should release the `resource` and should finish the
// `run_tasks_in_order`. Any instructions after
// `res.touched_count += 1` may be executed after `run_tasks_in_order`.
res.touched_count += 1;
});
dispatch_async(queue, move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert!(res.last_touched.is_some());
assert_eq!(res.last_touched.unwrap(), 1);
assert_eq!(res.touched_count, 1);
res.last_touched = Some(2);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
// Make sure the `res.touched_count += 1` is the last instruction of
// the task since we use `res.touched_count` to check if whether
// we should release the `resource` and should finish the
// `run_tasks_in_order`. Any instructions after
// `res.touched_count += 1` may be executed after `run_tasks_in_order`.
res.touched_count += 1;
// The following logs may cause error like illegal instruction or crash
// since they may be executed after `resource` is freed!
// println!("crash > {:?} @ {:p}", res, res);
// println!("crash > res @ 0x{:p} was touched {} times, last touch is {}",
// res, res.touched_count, res.last_touched.unwrap());
});
// Make sure the resource won't be freed before the tasks are finished.
while resource.touched_count < 2 {}
println!("{:?} @ {:p}", resource, &resource);
assert!(resource.last_touched.is_some());
assert_eq!(resource.last_touched.unwrap(), 2);
// This will free the `queue` asynchronously.
release_dispatch_queue(queue);
}
fn create_dispatch_queue(
label: &'static str,
queue_attr: sys::dispatch_queue_attr_t,
) -> sys::dispatch_queue_t {
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
unsafe { sys::dispatch_queue_create(c_string, queue_attr) }
}
fn release_dispatch_queue(queue: sys::dispatch_queue_t) {
unsafe {
sys::dispatch_release(queue.into());
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F)
where
F: 'static + Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_async_f(queue, closure, executor);
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F)
where
F: 'static + Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_sync_f(queue, closure, executor);
}
}
// TODO: Find a way to optimize it!
// Return a raw pointer to a (unboxed) closure and an executor that
// will run the closure (after re-boxing the closure) when it's called.
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t)
where
F: FnOnce(),
{
// In machine code level, the function defined within function is same
// as the function defined outside. They're just normal functions that
// are not accessible from outside the function.
// References:
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
where
F: FnOnce(),
{
// Retake the leaked closure.
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
(*closure)();
// closure is released after finishiing this function call.
}
// The rust closure is a struct. We need to convert it into a function
// pointer so it can be invoked on another thread.
let closure: Box<F> = Box::new(closure); // Allocate closure on heap.
// `closure_executer::<F>` is a function pointer pointing to the address of
// `closure_executer<F>` defined above.
let executor: sys::dispatch_function_t = Some(closure_executer::<F>);
(
Box::into_raw(closure) as *mut c_void, // Leak the closure.
executor,
)
}
fn sleep(millis: u64) {
let duration = Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() {
run_tasks_in_order_sync();
run_tasks_in_order_async();
// Wait 10 milliseconds to show the results.
sleep(10);
}
mod sys;
use std::ffi::CString;
use std::os::raw::c_void;
use std::ptr;
use std::{thread, time};
fn run_with_native_async_apis() {
let label = CString::new("Run with native async dispatch apis");
let c_string = if label.is_ok() {
label.unwrap().as_ptr()
} else {
ptr::null()
};
let queue = unsafe { sys::dispatch_queue_create(c_string, sys::DISPATCH_QUEUE_SERIAL) };
let mut context: i32 = 10;
println!("[async] context: {} @ {:p}", context, &context);
// In machine code level, the function defined within function is same
// as the function defined outside. They're just normal functions that
// are not accessible from outside the function.
// References:
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust
extern "C" fn work(context_ptr: *mut c_void) {
let ctx_mut_ref = unsafe { &mut (*(context_ptr as *mut i32)) };
println!(
"[async] (work) context: {} @ {:p}",
*ctx_mut_ref, ctx_mut_ref
);
assert_eq!(ctx_mut_ref, &10);
*ctx_mut_ref += 1;
}
unsafe {
sys::dispatch_async_f(
queue,
&mut context as *mut i32 as *mut c_void,
// `work` is a function pointer pointing to the address of
// `work` defined above.
Some(work),
);
}
while context == 10 {
println!("[async] wait for async task...");
// Wait 10 milliseconds to show the results for async APIs.
sleep(10);
}
println!("[async] context: {} @ {:p}", context, &context);
assert_eq!(context, 11);
unsafe {
sys::dispatch_release(queue.into());
}
}
fn run_with_native_sync_apis() {
let label = CString::new("Run with native sync dispatch apis");
let c_string = if label.is_ok() {
label.unwrap().as_ptr()
} else {
ptr::null()
};
let queue = unsafe { sys::dispatch_queue_create(c_string, sys::DISPATCH_QUEUE_SERIAL) };
let mut context: i32 = 20;
println!("[sync] context: {} @ {:p}", context, &context);
// In machine code level, the function defined within function is same
// as the function defined outside. They're just normal functions that
// are not accessible from outside the function.
// References:
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust
extern "C" fn work(context_ptr: *mut c_void) {
let ctx_mut_ref = unsafe { &mut (*(context_ptr as *mut i32)) };
println!(
"[sync] (work) context: {} @ {:p}",
*ctx_mut_ref, ctx_mut_ref
);
assert_eq!(ctx_mut_ref, &20);
*ctx_mut_ref = 30;
}
// It's ok to pass the pointer of `context`(which is on stack) here.
// Since the calling is sync, so the the program will wait here until
// the work is completed. That is, the `context` is still on the stack
// during work is running.
unsafe {
sys::dispatch_sync_f(
queue,
&mut context as *mut i32 as *mut c_void,
// `work` is a function pointer pointing to the address of
// `work` defined above.
Some(work),
);
}
println!("[sync] context: {} @ {:p}", context, &context);
assert_eq!(context, 30);
unsafe {
sys::dispatch_release(queue.into());
}
}
// TODO: It's better to create a queue object so we can make sure the task
// is dispatched to the queue we want.
// mod queue {
// struct Queue {
// inner: dispatch_queue_t
// }
// impl Queue {
// }
// }
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() {
run_with_native_sync_apis();
run_with_native_async_apis();
}
mod sys;
use std::ffi::CString;
use std::os::raw::c_void;
use std::{thread, time};
struct TaskContext {}
impl TaskContext {
fn new() -> Self {
TaskContext {}
}
fn run(&self) {
println!("run some task!");
}
}
fn run_task_by_rendering_sync_callback() {
let label = "Run tasks by rendering callback";
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL);
dispatch_task_sync(queue, TaskContext::new());
release_dispatch_queue(queue);
}
fn run_task_by_rendering_async_callback() {
let label = "Run tasks by rendering callback";
let queue = create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL);
dispatch_task_async(queue, TaskContext::new());
// TODO: the queue is leaked here!
}
fn create_dispatch_queue(
label: &'static str,
queue_attr: sys::dispatch_queue_attr_t,
) -> sys::dispatch_queue_t {
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
unsafe { sys::dispatch_queue_create(c_string, queue_attr) }
}
fn release_dispatch_queue(queue: sys::dispatch_queue_t) {
unsafe {
sys::dispatch_release(queue.into());
}
}
fn dispatch_task_async(queue: sys::dispatch_queue_t, context: TaskContext) {
// Allocate `task_context` on heap.
let task_context = Box::new(context);
unsafe {
sys::dispatch_async_f(
queue,
Box::into_raw(task_context) as *mut c_void, // Leak the `task_context`.
Some(dispatch_callback),
);
}
}
fn dispatch_task_sync(queue: sys::dispatch_queue_t, context: TaskContext) {
// Allocate `task_context` on heap.
let task_context = Box::new(context);
unsafe {
sys::dispatch_sync_f(
queue,
Box::into_raw(task_context) as *mut c_void, // Leak the `task_context`.
Some(dispatch_callback),
);
}
}
extern "C" fn dispatch_callback(context: *mut c_void) {
// Retake the leaked `task_context`.
let task_context = unsafe { Box::from_raw(context as *mut TaskContext) };
task_context.run();
// `task_context` is released after finishing `dispatch_callback`.
}
// TODO: It's better to create a queue object so we can make sure the task
// is dispatched to the queue we want.
// mod queue {
// struct Queue {
// inner: dispatch_queue_t
// }
// impl Queue {
// }
// }
fn sleep(millis: u64) {
let duration = time::Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() {
run_task_by_rendering_sync_callback();
run_task_by_rendering_async_callback();
// Wait 10 milliseconds to show the results for async APIs.
sleep(10);
}
mod sys;
use std::ffi::CString;
use std::os::raw::c_void;
use std::{thread, time::Duration};
fn run_tasks_in_order() {
#[derive(Debug)]
struct Resource {
last_touched: Option<u32>,
touched_count: u32,
}
impl Resource {
fn new() -> Self {
Resource {
last_touched: None,
touched_count: 0,
}
}
}
let mut resource = Resource::new();
println!("resource @ {:p}", &resource);
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let resource_ptr = &mut resource as *mut Resource as usize;
let queue = Queue::new("Test");
queue.run_async(move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert_eq!(res.last_touched, None);
assert_eq!(res.touched_count, 0);
res.last_touched = Some(1);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1
});
queue.run_sync(move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert!(res.last_touched.is_some());
assert_eq!(res.last_touched.unwrap(), 1);
assert_eq!(res.touched_count, 1);
res.last_touched = Some(2);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1;
});
queue.run_async(|| println!("Run after queue is drop?"));
queue.run_async(|| println!("Run after queue is drop?"));
queue.run_async(|| println!("Run after queue is drop?"));
}
// Queue: A wrapper around `dispatch_queue_t`.
// ------------------------------------------------------------------------------------------------
struct Queue(sys::dispatch_queue_t);
impl Queue {
fn new(label: &'static str) -> Self {
Self(create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL))
}
fn run_async<F>(&self, work: F)
where
F: 'static + Send + FnOnce(),
{
dispatch_async(self.0, work);
}
fn run_sync<F>(&self, work: F)
where
F: 'static + Send + FnOnce(),
{
dispatch_sync(self.0, work);
}
// This will release the inner `dispatch_queue_t` asynchronously.
fn release(&self) {
release_dispatch_queue(self.0);
}
}
impl Drop for Queue {
fn drop(&mut self) {
self.release();
println!("Queue is drop!");
}
}
// Low-level Grand Central Dispatch (GCD) APIs
// https://developer.apple.com/library/archive/documentation/General/Conceptual/ConcurrencyProgrammingGuide/OperationQueues/OperationQueues.html
// ------------------------------------------------------------------------------------------------
fn create_dispatch_queue(
label: &'static str,
queue_attr: sys::dispatch_queue_attr_t,
) -> sys::dispatch_queue_t {
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
unsafe { sys::dispatch_queue_create(c_string, queue_attr) }
}
// This will release the `queue` asynchronously.
fn release_dispatch_queue(queue: sys::dispatch_queue_t) {
unsafe {
sys::dispatch_release(queue.into());
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F)
where
F: 'static + Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_async_f(queue, closure, executor);
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F)
where
F: 'static + Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_sync_f(queue, closure, executor);
}
}
// TODO: Find a way to optimize it!
// Return a raw pointer to a (unboxed) closure and an executor that
// will run the closure (after re-boxing the closure) when it's called.
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t)
where
F: FnOnce(),
{
// In machine code level, the function defined within function is same
// as the function defined outside. They're just normal functions that
// are not accessible from outside the function.
// References:
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
where
F: FnOnce(),
{
// Retake the leaked closure.
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
(*closure)();
// closure is released after finishiing this function call.
}
// The rust closure is a struct. We need to convert it into a function
// pointer so it can be invoked on another thread.
let closure: Box<F> = Box::new(closure); // Allocate closure on heap.
// `closure_executer::<F>` is a function pointer pointing to the address of
// `closure_executer<F>` defined above.
let executor: sys::dispatch_function_t = Some(closure_executer::<F>);
(
Box::into_raw(closure) as *mut c_void, // Leak the closure.
executor,
)
}
fn sleep(millis: u64) {
let duration = Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() {
run_tasks_in_order();
// Wait 10 milliseconds to show the results.
sleep(10);
}
all:
rustfmt *.rs
rustc queue.rs
rustc dispatcher.rs
rustc dispatch_native_apis.rs
rustc dispatch_api.rs
rustc dispatch_render_callback.rs
clean:
rm queue
rm dispatcher
rm dispatch_native_apis
rm dispatch_api
rm dispatch_render_callback
mod sys;
use std::ffi::CString;
use std::fmt::Debug;
use std::os::raw::c_void;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time::Duration};
fn run_tasks_in_order() {
#[derive(Debug)]
struct Resource {
last_touched: Option<u32>,
touched_count: u32,
}
impl Resource {
fn new() -> Self {
Resource {
last_touched: None,
touched_count: 0,
}
}
}
let mut resource = Resource::new();
println!("resource @ {:p}", &resource);
// Rust compilter doesn't allow a pointer to be passed across threads.
// A hacky way to do that is to cast the pointer into a value, then
// the value, which is actually an address, can be copied into threads.
let resource_ptr = &mut resource as *mut Resource as usize;
let queue = Queue::new("Test");
queue.run_async(move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert_eq!(res.last_touched, None);
assert_eq!(res.touched_count, 0);
res.last_touched = Some(1);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1
});
queue.run_sync(move || {
let res: &mut Resource = unsafe {
let ptr = resource_ptr as *mut Resource;
&mut (*ptr)
};
assert_eq!(res as *mut Resource as usize, resource_ptr);
assert!(res.last_touched.is_some());
assert_eq!(res.last_touched.unwrap(), 1);
assert_eq!(res.touched_count, 1);
res.last_touched = Some(2);
println!(
"res @ 0x{:p} was touched by {}",
res,
res.last_touched.unwrap()
);
res.touched_count += 1;
});
queue.run_async(|| println!("Run after queue is drop?"));
queue.run_async(|| println!("Run after queue is drop?"));
queue.run_async(|| println!("Run after queue is drop?"));
}
// Queue: A wrapper around `dispatch_queue_t`.
// ------------------------------------------------------------------------------------------------
struct Queue(sys::dispatch_queue_t);
impl Queue {
// Public methods
fn new(label: &'static str) -> Self {
let q = Self(create_dispatch_queue(label, sys::DISPATCH_QUEUE_SERIAL));
let is_destroying = Box::new(AtomicBool::new(false));
q.set_context(is_destroying);
q
}
fn run_async<F>(&self, work: F)
where
F: 'static + Send + FnOnce(),
{
let is_destroying = self.get_context::<AtomicBool>();
dispatch_async(self.0, move || {
if is_destroying.load(Ordering::SeqCst) {
println!("Queue is dropped. Cancel the task.");
return;
}
work();
});
}
fn run_sync<F>(&self, work: F)
where
F: 'static + Send + FnOnce(),
{
let is_destroying = self.get_context::<AtomicBool>();
dispatch_sync(self.0, || {
if is_destroying.load(Ordering::SeqCst) {
println!("Queue is dropped. Cancel the task.");
return;
}
work();
});
}
// Private methods
fn get_context<T>(&self) -> &mut T {
unsafe { &mut *(get_dispatch_context(self.0) as *mut T) }
}
fn set_context<T>(&self, context: Box<T>) {
set_dispatch_context(self.0, Box::into_raw(context) as *mut c_void);
extern "C" fn finalizer<T>(context: *mut c_void) {
// Retake the leaked context and drop it.
let _ = unsafe { Box::from_raw(context as *mut T) };
}
set_dispatch_finalizer_f(self.0, Some(finalizer::<T>));
}
// This will release the inner `dispatch_queue_t` asynchronously.
fn release(&self) {
self.get_context::<AtomicBool>()
.store(true, Ordering::SeqCst);
// The tasks appended after the above call will be cancelled.
release_dispatch_queue(self.0);
}
}
impl Drop for Queue {
fn drop(&mut self) {
self.release();
println!("Queue is drop!");
}
}
// Low-level Grand Central Dispatch (GCD) APIs
// https://developer.apple.com/library/archive/documentation/General/Conceptual/ConcurrencyProgrammingGuide/OperationQueues/OperationQueues.html
// ------------------------------------------------------------------------------------------------
fn create_dispatch_queue(
label: &'static str,
queue_attr: sys::dispatch_queue_attr_t,
) -> sys::dispatch_queue_t {
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
unsafe { sys::dispatch_queue_create(c_string, queue_attr) }
}
// This will release the `queue` asynchronously.
fn release_dispatch_queue(queue: sys::dispatch_queue_t) {
unsafe {
sys::dispatch_release(queue.into());
}
}
fn get_dispatch_context(queue: sys::dispatch_queue_t) -> *mut c_void {
unsafe { sys::dispatch_get_context(queue.into()) }
}
fn set_dispatch_context(queue: sys::dispatch_queue_t, context: *mut c_void) {
unsafe {
sys::dispatch_set_context(queue.into(), context);
}
}
// The `finalizer` is only run if the `context` in queue is set (by `sys::dispatch_set_context`).
fn set_dispatch_finalizer_f(queue: sys::dispatch_queue_t, finalizer: sys::dispatch_function_t) {
unsafe {
sys::dispatch_set_finalizer_f(queue.into(), finalizer);
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_async<F>(queue: sys::dispatch_queue_t, work: F)
where
F: Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_async_f(queue, closure, executor);
}
}
// Send: Types that can be transferred across thread boundaries.
// FnOnce: One-time function.
fn dispatch_sync<F>(queue: sys::dispatch_queue_t, work: F)
where
F: Send + FnOnce(),
{
let (closure, executor) = create_closure_and_executor(work);
unsafe {
sys::dispatch_sync_f(queue, closure, executor);
}
}
// TODO: Find a way to optimize it!
// Return a raw pointer to a (unboxed) closure and an executor that
// will run the closure (after re-boxing the closure) when it's called.
fn create_closure_and_executor<F>(closure: F) -> (*mut c_void, sys::dispatch_function_t)
where
F: FnOnce(),
{
// In machine code level, the function defined within function is same
// as the function defined outside. They're just normal functions that
// are not accessible from outside the function.
// References:
// https://users.rust-lang.org/t/inner-functions-not-closed-over-their-environment/7696
// https://stackoverflow.com/questions/26685666/a-local-function-in-rust
extern "C" fn closure_executer<F>(unboxed_closure: *mut c_void)
where
F: FnOnce(),
{
// Retake the leaked closure.
let closure: Box<F> = unsafe { Box::from_raw(unboxed_closure as *mut F) };
// Execute the closure.
(*closure)();
// closure is released after finishiing this function call.
}
// The rust closure is a struct. We need to convert it into a function
// pointer so it can be invoked on another thread.
let closure: Box<F> = Box::new(closure); // Allocate closure on heap.
// `closure_executer::<F>` is a function pointer pointing to the address of
// `closure_executer<F>` defined above.
let executor: sys::dispatch_function_t = Some(closure_executer::<F>);
(
Box::into_raw(closure) as *mut c_void, // Leak the closure.
executor,
)
}
fn sleep(millis: u64) {
let duration = Duration::from_millis(millis);
thread::sleep(duration);
}
fn main() {
run_tasks_in_order();
// Wait 10 milliseconds to show the results.
sleep(10);
}
// References:
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L85
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L207
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L235
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L259
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L277
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/object.h#L303
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L139
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L205
// https://github.com/phracker/MacOSX-SDKs/blob/9fc3ed0ad0345950ac25c28695b0427846eea966/MacOSX10.13.sdk/usr/include/dispatch/queue.h#L819
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/index.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_function_t.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_queue_attr_t.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/type.dispatch_queue_t.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_queue_create.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_release.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_retain.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_async_f.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_sync_f.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_get_context.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_set_context.html
// https://rustaudio.github.io/coreaudio-rs/coreaudio_sys/audio_unit/fn.dispatch_set_finalizer_f.html
// https://github.com/SSheldon/rust-dispatch/blob/master/src/ffi.rs
#![allow(non_camel_case_types)]
use std::os::raw::{c_char, c_void};
pub enum dispatch_queue_s {}
pub enum dispatch_queue_attr_s {}
#[repr(C)]
pub union dispatch_object_s {
_dq: *mut dispatch_queue_s,
}
impl From<*mut dispatch_queue_s> for dispatch_object_s {
fn from(dq: *mut dispatch_queue_s) -> Self {
Self { _dq: dq }
}
}
pub type dispatch_function_t = Option<extern "C" fn(*mut c_void)>;
pub type dispatch_object_t = dispatch_object_s;
pub type dispatch_queue_attr_t = *mut dispatch_queue_attr_s;
pub type dispatch_queue_t = *mut dispatch_queue_s;
pub const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = 0 as dispatch_queue_attr_t;
#[link(name = "System", kind = "dylib")] // Link to a dynamic library in Dispatch framework.
extern "C" {
pub fn dispatch_queue_create(
label: *const c_char,
attr: dispatch_queue_attr_t,
) -> dispatch_queue_t;
pub fn dispatch_release(object: dispatch_object_t);
// `dispatch_block_t` is a lambda expression-like syntax to create closures invented by Apple.
// Not sure if Rust supports it. `dispatch_async_f` and `dispatch_sync_f` should be able to
// replace `dispatch_async` and `dispatch_sync`.
pub fn dispatch_async_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
pub fn dispatch_sync_f(
queue: dispatch_queue_t,
context: *mut c_void,
work: dispatch_function_t,
);
pub fn dispatch_get_context(object: dispatch_object_t) -> *mut c_void;
pub fn dispatch_set_context(object: dispatch_object_t, context: *mut c_void);
pub fn dispatch_set_finalizer_f(object: dispatch_object_t, finalizer: dispatch_function_t);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment