summaryrefslogtreecommitdiffstats
path: root/third_party/rust/coreaudio-sys-utils/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/coreaudio-sys-utils/src')
-rw-r--r--third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs2
-rw-r--r--third_party/rust/coreaudio-sys-utils/src/audio_object.rs4
-rw-r--r--third_party/rust/coreaudio-sys-utils/src/audio_unit.rs13
-rw-r--r--third_party/rust/coreaudio-sys-utils/src/dispatch.rs268
4 files changed, 244 insertions, 43 deletions
diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs b/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs
index 04eeaaf566..8224d87c28 100644
--- a/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs
+++ b/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs
@@ -1,3 +1,4 @@
+use crate::dispatch::*;
use coreaudio_sys::*;
// See https://opensource.apple.com/source/WebCore/WebCore-7604.5.6/platform/spi/cf/CoreAudioSPI.h.auto.html
@@ -18,5 +19,6 @@ pub fn audio_device_duck(
in_start_time: *const AudioTimeStamp,
in_ramp_duration: f32,
) -> OSStatus {
+ debug_assert_running_serially();
unsafe { AudioDeviceDuck(in_device, in_ducked_level, in_start_time, in_ramp_duration) }
}
diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_object.rs b/third_party/rust/coreaudio-sys-utils/src/audio_object.rs
index 368d6caadc..b8d74c80fc 100644
--- a/third_party/rust/coreaudio-sys-utils/src/audio_object.rs
+++ b/third_party/rust/coreaudio-sys-utils/src/audio_object.rs
@@ -1,3 +1,4 @@
+use crate::dispatch::*;
use coreaudio_sys::*;
use std::fmt;
use std::os::raw::c_void;
@@ -77,6 +78,7 @@ pub fn audio_object_set_property_data<T>(
size: usize,
data: *const T,
) -> OSStatus {
+ debug_assert_running_serially();
unsafe {
AudioObjectSetPropertyData(
id,
@@ -99,6 +101,7 @@ pub fn audio_object_add_property_listener<T>(
listener: audio_object_property_listener_proc,
data: *mut T,
) -> OSStatus {
+ debug_assert_running_serially();
unsafe { AudioObjectAddPropertyListener(id, address, Some(listener), data as *mut c_void) }
}
@@ -108,6 +111,7 @@ pub fn audio_object_remove_property_listener<T>(
listener: audio_object_property_listener_proc,
data: *mut T,
) -> OSStatus {
+ debug_assert_running_serially();
unsafe { AudioObjectRemovePropertyListener(id, address, Some(listener), data as *mut c_void) }
}
diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs b/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs
index 059a58f26b..30e5a3cf4b 100644
--- a/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs
+++ b/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs
@@ -1,3 +1,4 @@
+use crate::dispatch::debug_assert_running_serially;
use coreaudio_sys::*;
use std::convert::TryFrom;
use std::os::raw::c_void;
@@ -13,6 +14,7 @@ pub fn audio_unit_get_property_info(
) -> OSStatus {
assert!(!unit.is_null());
assert!(UInt32::try_from(*size).is_ok()); // Check if `size` can be converted to a UInt32.
+ debug_assert_running_serially();
unsafe {
AudioUnitGetPropertyInfo(
unit,
@@ -35,6 +37,7 @@ pub fn audio_unit_get_property<T>(
) -> OSStatus {
assert!(!unit.is_null());
assert!(UInt32::try_from(*size).is_ok()); // Check if `size` can be converted to a UInt32.
+ debug_assert_running_serially();
unsafe {
AudioUnitGetProperty(
unit,
@@ -56,6 +59,7 @@ pub fn audio_unit_set_property<T>(
size: usize,
) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe {
AudioUnitSetProperty(
unit,
@@ -76,6 +80,7 @@ pub fn audio_unit_get_parameter(
value: &mut AudioUnitParameterValue,
) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe {
AudioUnitGetParameter(
unit,
@@ -96,30 +101,36 @@ pub fn audio_unit_set_parameter(
buffer_offset_in_frames: UInt32,
) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioUnitSetParameter(unit, id, scope, element, value, buffer_offset_in_frames) }
}
pub fn audio_unit_initialize(unit: AudioUnit) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioUnitInitialize(unit) }
}
pub fn audio_unit_uninitialize(unit: AudioUnit) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioUnitUninitialize(unit) }
}
pub fn dispose_audio_unit(unit: AudioUnit) -> OSStatus {
+ debug_assert_running_serially();
unsafe { AudioComponentInstanceDispose(unit) }
}
pub fn audio_output_unit_start(unit: AudioUnit) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioOutputUnitStart(unit) }
}
pub fn audio_output_unit_stop(unit: AudioUnit) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioOutputUnitStop(unit) }
}
@@ -155,6 +166,7 @@ pub fn audio_unit_add_property_listener<T>(
data: *mut T,
) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe { AudioUnitAddPropertyListener(unit, id, Some(listener), data as *mut c_void) }
}
@@ -165,6 +177,7 @@ pub fn audio_unit_remove_property_listener_with_user_data<T>(
data: *mut T,
) -> OSStatus {
assert!(!unit.is_null());
+ debug_assert_running_serially();
unsafe {
AudioUnitRemovePropertyListenerWithUserData(unit, id, Some(listener), data as *mut c_void)
}
diff --git a/third_party/rust/coreaudio-sys-utils/src/dispatch.rs b/third_party/rust/coreaudio-sys-utils/src/dispatch.rs
index 602304bde8..c5da137cab 100644
--- a/third_party/rust/coreaudio-sys-utils/src/dispatch.rs
+++ b/third_party/rust/coreaudio-sys-utils/src/dispatch.rs
@@ -3,40 +3,118 @@ use coreaudio_sys::*;
use std::ffi::CString;
use std::mem;
use std::os::raw::c_void;
+use std::panic;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Mutex, OnceLock};
+#[cfg(test)]
+use std::thread;
+#[cfg(test)]
+use std::time::Duration;
+use std::time::Instant;
-// Queue: A wrapper around `dispatch_queue_t`.
+pub const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb";
+
+pub fn get_serial_queue_singleton() -> &'static Queue {
+ static SERIAL_QUEUE: OnceLock<Queue> = OnceLock::new();
+ SERIAL_QUEUE.get_or_init(|| Queue::new(DISPATCH_QUEUE_LABEL))
+}
+
+pub fn debug_assert_running_serially() {
+ get_serial_queue_singleton().debug_assert_is_current();
+}
+
+pub fn debug_assert_not_running_serially() {
+ get_serial_queue_singleton().debug_assert_is_not_current();
+}
+
+pub fn run_serially<F, B>(work: F) -> B
+where
+ F: FnOnce() -> B,
+{
+ get_serial_queue_singleton().run_sync(|| work()).unwrap()
+}
+
+pub fn run_serially_forward_panics<F, B>(work: F) -> B
+where
+ F: panic::UnwindSafe + FnOnce() -> B,
+{
+ match run_serially(|| panic::catch_unwind(|| work())) {
+ Ok(res) => res,
+ Err(e) => panic::resume_unwind(e),
+ }
+}
+
+// Queue: A wrapper around `dispatch_queue_t` that is always serial.
// ------------------------------------------------------------------------------------------------
#[derive(Debug)]
-pub struct Queue(dispatch_queue_t);
+pub struct Queue {
+ queue: Mutex<dispatch_queue_t>,
+ owned: AtomicBool,
+}
impl Queue {
- pub fn new(label: &str) -> Self {
+ pub fn new_with_target(label: &str, target: &Queue) -> Self {
const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t =
ptr::null_mut::<dispatch_queue_attr_s>();
let label = CString::new(label).unwrap();
let c_string = label.as_ptr();
- let queue = Self(unsafe { dispatch_queue_create(c_string, DISPATCH_QUEUE_SERIAL) });
+ let queue = {
+ let target_guard = target.queue.lock().unwrap();
+ Self {
+ queue: Mutex::new(unsafe {
+ dispatch_queue_create_with_target(
+ c_string,
+ DISPATCH_QUEUE_SERIAL,
+ *target_guard,
+ )
+ }),
+ owned: AtomicBool::new(true),
+ }
+ };
queue.set_should_cancel(Box::new(AtomicBool::new(false)));
queue
}
+ pub fn new(label: &str) -> Self {
+ Queue::new_with_target(label, &Queue::get_global_queue())
+ }
+
+ pub fn get_global_queue() -> Self {
+ Self {
+ queue: Mutex::new(unsafe { dispatch_get_global_queue(QOS_CLASS_DEFAULT as isize, 0) }),
+ owned: AtomicBool::new(false),
+ }
+ }
+
#[cfg(debug_assertions)]
pub fn debug_assert_is_current(&self) {
+ let guard = self.queue.lock().unwrap();
unsafe {
- dispatch_assert_queue(self.0);
+ dispatch_assert_queue(*guard);
}
}
#[cfg(not(debug_assertions))]
pub fn debug_assert_is_current(&self) {}
+ #[cfg(debug_assertions)]
+ pub fn debug_assert_is_not_current(&self) {
+ let guard = self.queue.lock().unwrap();
+ unsafe {
+ dispatch_assert_queue_not(*guard);
+ }
+ }
+
+ #[cfg(not(debug_assertions))]
+ pub fn debug_assert_is_not_current(&self) {}
+
pub fn run_async<F>(&self, work: F)
where
F: Send + FnOnce(),
{
- let should_cancel = self.get_should_cancel();
+ let guard = self.queue.lock().unwrap();
+ let should_cancel = self.get_should_cancel(*guard);
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
return;
@@ -44,15 +122,22 @@ impl Queue {
work();
});
unsafe {
- dispatch_async_f(self.0, closure, executor);
+ dispatch_async_f(*guard, closure, executor);
}
}
- pub fn run_sync<F>(&self, work: F)
+ pub fn run_after<F>(&self, when: Instant, work: F)
where
F: Send + FnOnce(),
{
- let should_cancel = self.get_should_cancel();
+ let now = Instant::now();
+ if when <= now {
+ return self.run_async(work);
+ }
+ let nanos = (when - now).as_nanos() as i64;
+ let when = unsafe { dispatch_time(DISPATCH_TIME_NOW.into(), nanos) };
+ let guard = self.queue.lock().unwrap();
+ let should_cancel = self.get_should_cancel(*guard);
let (closure, executor) = Self::create_closure_and_executor(|| {
if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
return;
@@ -60,38 +145,85 @@ impl Queue {
work();
});
unsafe {
- dispatch_sync_f(self.0, closure, executor);
+ dispatch_after_f(when, *guard, closure, executor);
}
}
- pub fn run_final<F>(&self, work: F)
+ pub fn run_sync<F, B>(&self, work: F) -> Option<B>
where
- F: Send + FnOnce(),
+ F: FnOnce() -> B,
{
- let should_cancel = self.get_should_cancel();
- let (closure, executor) = Self::create_closure_and_executor(|| {
- work();
- should_cancel
- .expect("dispatch context should be allocated!")
- .store(true, Ordering::SeqCst);
- });
+ let queue: Option<dispatch_queue_t>;
+ let mut res: Option<B> = None;
+ let cex: Option<(*mut c_void, dispatch_function_t)>;
+ {
+ let guard = self.queue.lock().unwrap();
+ queue = Some(*guard);
+ let should_cancel = self.get_should_cancel(*guard);
+ cex = Some(Self::create_closure_and_executor(|| {
+ if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) {
+ return;
+ }
+ res = Some(work());
+ }));
+ }
+ let (closure, executor) = cex.unwrap();
+ unsafe {
+ dispatch_sync_f(queue.unwrap(), closure, executor);
+ }
+ res
+ }
+
+ pub fn run_final<F, B>(&self, work: F) -> Option<B>
+ where
+ F: FnOnce() -> B,
+ {
+ assert!(
+ self.owned.load(Ordering::SeqCst),
+ "Doesn't make sense to finalize global queue"
+ );
+ let queue: Option<dispatch_queue_t>;
+ let mut res: Option<B> = None;
+ let cex: Option<(*mut c_void, dispatch_function_t)>;
+ {
+ let guard = self.queue.lock().unwrap();
+ queue = Some(*guard);
+ let should_cancel = self.get_should_cancel(*guard);
+ debug_assert!(
+ should_cancel.is_some(),
+ "dispatch context should be allocated!"
+ );
+ cex = Some(Self::create_closure_and_executor(|| {
+ res = Some(work());
+ should_cancel
+ .expect("dispatch context should be allocated!")
+ .store(true, Ordering::SeqCst);
+ }));
+ }
+ let (closure, executor) = cex.unwrap();
unsafe {
- dispatch_sync_f(self.0, closure, executor);
+ dispatch_sync_f(queue.unwrap(), closure, executor);
}
+ res
}
- fn get_should_cancel(&self) -> Option<&mut AtomicBool> {
+ fn get_should_cancel(&self, queue: dispatch_queue_t) -> Option<&mut AtomicBool> {
+ if !self.owned.load(Ordering::SeqCst) {
+ return None;
+ }
unsafe {
- let context = dispatch_get_context(
- mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0),
- ) as *mut AtomicBool;
+ let context =
+ dispatch_get_context(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue))
+ as *mut AtomicBool;
context.as_mut()
}
}
fn set_should_cancel(&self, context: Box<AtomicBool>) {
+ assert!(self.owned.load(Ordering::SeqCst));
unsafe {
- let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0);
+ let guard = self.queue.lock().unwrap();
+ let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(*guard);
// Leak the context from Box.
dispatch_set_context(queue, Box::into_raw(context) as *mut c_void);
@@ -106,13 +238,13 @@ impl Queue {
}
fn release(&self) {
+ let guard = self.queue.lock().unwrap();
+ let queue = *guard;
unsafe {
// This will release the inner `dispatch_queue_t` asynchronously.
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
- dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(
- self.0,
- ));
+ dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
}
}
@@ -143,23 +275,35 @@ impl Queue {
impl Drop for Queue {
fn drop(&mut self) {
- self.release();
+ if self.owned.load(Ordering::SeqCst) {
+ self.release();
+ }
}
}
impl Clone for Queue {
fn clone(&self) -> Self {
+ assert!(
+ self.owned.load(Ordering::SeqCst),
+ "No need to clone a static queue"
+ );
+ let guard = self.queue.lock().unwrap();
+ let queue = *guard;
// TODO: It's incredibly unsafe to call `transmute` directly.
// Find another way to release the queue.
unsafe {
- dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(
- self.0,
- ));
+ dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue));
+ }
+ Self {
+ queue: Mutex::new(queue),
+ owned: AtomicBool::new(true),
}
- Self(self.0)
}
}
+unsafe impl Send for Queue {}
+unsafe impl Sync for Queue {}
+
#[test]
fn run_tasks_in_order() {
let mut visited = Vec::<u32>::new();
@@ -176,12 +320,12 @@ fn run_tasks_in_order() {
let queue = Queue::new("Run tasks in order");
- queue.run_sync(move || visit(1, ptr));
- queue.run_sync(move || visit(2, ptr));
- queue.run_async(move || visit(3, ptr));
- queue.run_async(move || visit(4, ptr));
+ queue.run_sync(|| visit(1, ptr));
+ queue.run_sync(|| visit(2, ptr));
+ queue.run_async(|| visit(3, ptr));
+ queue.run_async(|| visit(4, ptr));
// Call sync here to block the current thread and make sure all the tasks are done.
- queue.run_sync(move || visit(5, ptr));
+ queue.run_sync(|| visit(5, ptr));
assert_eq!(visited, vec![1, 2, 3, 4, 5]);
}
@@ -203,14 +347,52 @@ fn run_final_task() {
let queue = Queue::new("Task after run_final will be cancelled");
- queue.run_sync(move || visit(1, ptr));
- queue.run_async(move || visit(2, ptr));
- queue.run_final(move || visit(3, ptr));
- queue.run_async(move || visit(4, ptr));
- queue.run_sync(move || visit(5, ptr));
+ queue.run_sync(|| visit(1, ptr));
+ queue.run_async(|| visit(2, ptr));
+ queue.run_final(|| visit(3, ptr));
+ queue.run_async(|| visit(4, ptr));
+ queue.run_sync(|| visit(5, ptr));
}
// `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
// should be fired to clean up the `context` set in the `queue`.
assert_eq!(visited, vec![1, 2, 3]);
}
+
+#[test]
+fn sync_return_value() {
+ let q = Queue::new("Test queue");
+ assert_eq!(q.run_sync(|| 42), Some(42));
+ assert_eq!(q.run_final(|| "foo"), Some("foo"));
+ assert_eq!(q.run_sync(|| Ok::<(), u32>(())), None);
+}
+
+#[test]
+fn run_after() {
+ let mut visited = Vec::<u32>::new();
+
+ {
+ // Rust compilter doesn't allow a pointer to be passed across threads.
+ // A hacky way to do that is to cast the pointer into a value, then
+ // the value, which is actually an address, can be copied into threads.
+ let ptr = &mut visited as *mut Vec<u32> as usize;
+
+ fn visit(v: u32, visited_ptr: usize) {
+ let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) };
+ visited.push(v);
+ }
+
+ let queue = Queue::new("Task after run_final will be cancelled");
+
+ queue.run_async(|| visit(1, ptr));
+ queue.run_after(Instant::now() + Duration::from_millis(10), || visit(2, ptr));
+ queue.run_after(Instant::now() + Duration::from_secs(1), || visit(3, ptr));
+ queue.run_async(|| visit(4, ptr));
+ thread::sleep(Duration::from_millis(100));
+ queue.run_final(|| visit(5, ptr));
+ }
+ // `queue` will be dropped asynchronously and then the `finalizer` of the `queue`
+ // should be fired to clean up the `context` set in the `queue`.
+
+ assert_eq!(visited, vec![1, 4, 2, 5]);
+}