diff options
Diffstat (limited to 'third_party/rust/coreaudio-sys-utils')
5 files changed, 245 insertions, 44 deletions
diff --git a/third_party/rust/coreaudio-sys-utils/.cargo-checksum.json b/third_party/rust/coreaudio-sys-utils/.cargo-checksum.json index 5459f53214..f22a44473c 100644 --- a/third_party/rust/coreaudio-sys-utils/.cargo-checksum.json +++ b/third_party/rust/coreaudio-sys-utils/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"87292d055a2fc0f070f54abd549a5f79ec8ac33611ecde80ba394f256b88294c","src/aggregate_device.rs":"7d2bd5f5fd7f3d008ebb69ad81f522ca0cb73db6d7b3e50ed1a63ea26ff721f4","src/audio_device_extensions.rs":"2852f9ce65581cb5cf3f8e581f2652087eae0a569ed429be362e636db6441b6b","src/audio_object.rs":"5447179330a862659a25bceedfdc5d29a1296f63490908d1c868c6b21c5f95a1","src/audio_unit.rs":"d783878930df4923b57ad230138c0f3fd6b0b9bb80a39725092ff4c6615162d8","src/cf_mutable_dict.rs":"fc42edd270c6dfb02f123214d2d8e487bbd62b5bd923b71eec13190fd0104d2a","src/dispatch.rs":"f6267fe587217c3d3ad5fe7f3a35955221c936103bf853c477a2e44eba5f1e46","src/lib.rs":"c93ed1411dd6cc39db44f57e0d7683bbc54745f84a3c9f9533a088895ec97abe","src/string.rs":"28f88b816c768bcfcc674a60d962b93f1c94e5e0f4cc8ed2a1301138b91039e7"},"package":null}
\ No newline at end of file +{"files":{"Cargo.toml":"87292d055a2fc0f070f54abd549a5f79ec8ac33611ecde80ba394f256b88294c","src/aggregate_device.rs":"7d2bd5f5fd7f3d008ebb69ad81f522ca0cb73db6d7b3e50ed1a63ea26ff721f4","src/audio_device_extensions.rs":"5c869d791947d15eec8bffe0bb302fe32d0578111ffe0049213e720eb60a34e1","src/audio_object.rs":"34f7e038c1ed30d503d669d89f01864ae90e009a2fa74ef50fac343a53113ff2","src/audio_unit.rs":"d38007faed2ce4d88efb70054a1fdfadf8249d0e55b900eb3ac8eae04355bf2b","src/cf_mutable_dict.rs":"fc42edd270c6dfb02f123214d2d8e487bbd62b5bd923b71eec13190fd0104d2a","src/dispatch.rs":"24b6bcf0dcaa6618e03039cd060a274c8f9ed48264e14de465ae3aacb2daad57","src/lib.rs":"c93ed1411dd6cc39db44f57e0d7683bbc54745f84a3c9f9533a088895ec97abe","src/string.rs":"28f88b816c768bcfcc674a60d962b93f1c94e5e0f4cc8ed2a1301138b91039e7"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs b/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs index 04eeaaf566..8224d87c28 100644 --- a/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs +++ b/third_party/rust/coreaudio-sys-utils/src/audio_device_extensions.rs @@ -1,3 +1,4 @@ +use crate::dispatch::*; use coreaudio_sys::*; // See https://opensource.apple.com/source/WebCore/WebCore-7604.5.6/platform/spi/cf/CoreAudioSPI.h.auto.html @@ -18,5 +19,6 @@ pub fn audio_device_duck( in_start_time: *const AudioTimeStamp, in_ramp_duration: f32, ) -> OSStatus { + debug_assert_running_serially(); unsafe { AudioDeviceDuck(in_device, in_ducked_level, in_start_time, in_ramp_duration) } } diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_object.rs b/third_party/rust/coreaudio-sys-utils/src/audio_object.rs index 368d6caadc..b8d74c80fc 100644 --- a/third_party/rust/coreaudio-sys-utils/src/audio_object.rs +++ b/third_party/rust/coreaudio-sys-utils/src/audio_object.rs @@ -1,3 +1,4 @@ +use crate::dispatch::*; use coreaudio_sys::*; use std::fmt; use std::os::raw::c_void; @@ -77,6 +78,7 @@ pub fn audio_object_set_property_data<T>( size: usize, data: *const T, ) -> OSStatus { + debug_assert_running_serially(); unsafe { AudioObjectSetPropertyData( id, @@ -99,6 +101,7 @@ pub fn audio_object_add_property_listener<T>( listener: audio_object_property_listener_proc, data: *mut T, ) -> OSStatus { + debug_assert_running_serially(); unsafe { AudioObjectAddPropertyListener(id, address, Some(listener), data as *mut c_void) } } @@ -108,6 +111,7 @@ pub fn audio_object_remove_property_listener<T>( listener: audio_object_property_listener_proc, data: *mut T, ) -> OSStatus { + debug_assert_running_serially(); unsafe { AudioObjectRemovePropertyListener(id, address, Some(listener), data as *mut c_void) } } diff --git a/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs b/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs index 059a58f26b..30e5a3cf4b 100644 --- a/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs +++ b/third_party/rust/coreaudio-sys-utils/src/audio_unit.rs @@ -1,3 +1,4 @@ +use crate::dispatch::debug_assert_running_serially; use coreaudio_sys::*; use std::convert::TryFrom; use std::os::raw::c_void; @@ -13,6 +14,7 @@ pub fn audio_unit_get_property_info( ) -> OSStatus { assert!(!unit.is_null()); assert!(UInt32::try_from(*size).is_ok()); // Check if `size` can be converted to a UInt32. + debug_assert_running_serially(); unsafe { AudioUnitGetPropertyInfo( unit, @@ -35,6 +37,7 @@ pub fn audio_unit_get_property<T>( ) -> OSStatus { assert!(!unit.is_null()); assert!(UInt32::try_from(*size).is_ok()); // Check if `size` can be converted to a UInt32. + debug_assert_running_serially(); unsafe { AudioUnitGetProperty( unit, @@ -56,6 +59,7 @@ pub fn audio_unit_set_property<T>( size: usize, ) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitSetProperty( unit, @@ -76,6 +80,7 @@ pub fn audio_unit_get_parameter( value: &mut AudioUnitParameterValue, ) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitGetParameter( unit, @@ -96,30 +101,36 @@ pub fn audio_unit_set_parameter( buffer_offset_in_frames: UInt32, ) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitSetParameter(unit, id, scope, element, value, buffer_offset_in_frames) } } pub fn audio_unit_initialize(unit: AudioUnit) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitInitialize(unit) } } pub fn audio_unit_uninitialize(unit: AudioUnit) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitUninitialize(unit) } } pub fn dispose_audio_unit(unit: AudioUnit) -> OSStatus { + debug_assert_running_serially(); unsafe { AudioComponentInstanceDispose(unit) } } pub fn audio_output_unit_start(unit: AudioUnit) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioOutputUnitStart(unit) } } pub fn audio_output_unit_stop(unit: AudioUnit) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioOutputUnitStop(unit) } } @@ -155,6 +166,7 @@ pub fn audio_unit_add_property_listener<T>( data: *mut T, ) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitAddPropertyListener(unit, id, Some(listener), data as *mut c_void) } } @@ -165,6 +177,7 @@ pub fn audio_unit_remove_property_listener_with_user_data<T>( data: *mut T, ) -> OSStatus { assert!(!unit.is_null()); + debug_assert_running_serially(); unsafe { AudioUnitRemovePropertyListenerWithUserData(unit, id, Some(listener), data as *mut c_void) } diff --git a/third_party/rust/coreaudio-sys-utils/src/dispatch.rs b/third_party/rust/coreaudio-sys-utils/src/dispatch.rs index 602304bde8..c5da137cab 100644 --- a/third_party/rust/coreaudio-sys-utils/src/dispatch.rs +++ b/third_party/rust/coreaudio-sys-utils/src/dispatch.rs @@ -3,40 +3,118 @@ use coreaudio_sys::*; use std::ffi::CString; use std::mem; use std::os::raw::c_void; +use std::panic; use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Mutex, OnceLock}; +#[cfg(test)] +use std::thread; +#[cfg(test)] +use std::time::Duration; +use std::time::Instant; -// Queue: A wrapper around `dispatch_queue_t`. +pub const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb"; + +pub fn get_serial_queue_singleton() -> &'static Queue { + static SERIAL_QUEUE: OnceLock<Queue> = OnceLock::new(); + SERIAL_QUEUE.get_or_init(|| Queue::new(DISPATCH_QUEUE_LABEL)) +} + +pub fn debug_assert_running_serially() { + get_serial_queue_singleton().debug_assert_is_current(); +} + +pub fn debug_assert_not_running_serially() { + get_serial_queue_singleton().debug_assert_is_not_current(); +} + +pub fn run_serially<F, B>(work: F) -> B +where + F: FnOnce() -> B, +{ + get_serial_queue_singleton().run_sync(|| work()).unwrap() +} + +pub fn run_serially_forward_panics<F, B>(work: F) -> B +where + F: panic::UnwindSafe + FnOnce() -> B, +{ + match run_serially(|| panic::catch_unwind(|| work())) { + Ok(res) => res, + Err(e) => panic::resume_unwind(e), + } +} + +// Queue: A wrapper around `dispatch_queue_t` that is always serial. // ------------------------------------------------------------------------------------------------ #[derive(Debug)] -pub struct Queue(dispatch_queue_t); +pub struct Queue { + queue: Mutex<dispatch_queue_t>, + owned: AtomicBool, +} impl Queue { - pub fn new(label: &str) -> Self { + pub fn new_with_target(label: &str, target: &Queue) -> Self { const DISPATCH_QUEUE_SERIAL: dispatch_queue_attr_t = ptr::null_mut::<dispatch_queue_attr_s>(); let label = CString::new(label).unwrap(); let c_string = label.as_ptr(); - let queue = Self(unsafe { dispatch_queue_create(c_string, DISPATCH_QUEUE_SERIAL) }); + let queue = { + let target_guard = target.queue.lock().unwrap(); + Self { + queue: Mutex::new(unsafe { + dispatch_queue_create_with_target( + c_string, + DISPATCH_QUEUE_SERIAL, + *target_guard, + ) + }), + owned: AtomicBool::new(true), + } + }; queue.set_should_cancel(Box::new(AtomicBool::new(false))); queue } + pub fn new(label: &str) -> Self { + Queue::new_with_target(label, &Queue::get_global_queue()) + } + + pub fn get_global_queue() -> Self { + Self { + queue: Mutex::new(unsafe { dispatch_get_global_queue(QOS_CLASS_DEFAULT as isize, 0) }), + owned: AtomicBool::new(false), + } + } + #[cfg(debug_assertions)] pub fn debug_assert_is_current(&self) { + let guard = self.queue.lock().unwrap(); unsafe { - dispatch_assert_queue(self.0); + dispatch_assert_queue(*guard); } } #[cfg(not(debug_assertions))] pub fn debug_assert_is_current(&self) {} + #[cfg(debug_assertions)] + pub fn debug_assert_is_not_current(&self) { + let guard = self.queue.lock().unwrap(); + unsafe { + dispatch_assert_queue_not(*guard); + } + } + + #[cfg(not(debug_assertions))] + pub fn debug_assert_is_not_current(&self) {} + pub fn run_async<F>(&self, work: F) where F: Send + FnOnce(), { - let should_cancel = self.get_should_cancel(); + let guard = self.queue.lock().unwrap(); + let should_cancel = self.get_should_cancel(*guard); let (closure, executor) = Self::create_closure_and_executor(|| { if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) { return; @@ -44,15 +122,22 @@ impl Queue { work(); }); unsafe { - dispatch_async_f(self.0, closure, executor); + dispatch_async_f(*guard, closure, executor); } } - pub fn run_sync<F>(&self, work: F) + pub fn run_after<F>(&self, when: Instant, work: F) where F: Send + FnOnce(), { - let should_cancel = self.get_should_cancel(); + let now = Instant::now(); + if when <= now { + return self.run_async(work); + } + let nanos = (when - now).as_nanos() as i64; + let when = unsafe { dispatch_time(DISPATCH_TIME_NOW.into(), nanos) }; + let guard = self.queue.lock().unwrap(); + let should_cancel = self.get_should_cancel(*guard); let (closure, executor) = Self::create_closure_and_executor(|| { if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) { return; @@ -60,38 +145,85 @@ impl Queue { work(); }); unsafe { - dispatch_sync_f(self.0, closure, executor); + dispatch_after_f(when, *guard, closure, executor); } } - pub fn run_final<F>(&self, work: F) + pub fn run_sync<F, B>(&self, work: F) -> Option<B> where - F: Send + FnOnce(), + F: FnOnce() -> B, { - let should_cancel = self.get_should_cancel(); - let (closure, executor) = Self::create_closure_and_executor(|| { - work(); - should_cancel - .expect("dispatch context should be allocated!") - .store(true, Ordering::SeqCst); - }); + let queue: Option<dispatch_queue_t>; + let mut res: Option<B> = None; + let cex: Option<(*mut c_void, dispatch_function_t)>; + { + let guard = self.queue.lock().unwrap(); + queue = Some(*guard); + let should_cancel = self.get_should_cancel(*guard); + cex = Some(Self::create_closure_and_executor(|| { + if should_cancel.map_or(false, |v| v.load(Ordering::SeqCst)) { + return; + } + res = Some(work()); + })); + } + let (closure, executor) = cex.unwrap(); + unsafe { + dispatch_sync_f(queue.unwrap(), closure, executor); + } + res + } + + pub fn run_final<F, B>(&self, work: F) -> Option<B> + where + F: FnOnce() -> B, + { + assert!( + self.owned.load(Ordering::SeqCst), + "Doesn't make sense to finalize global queue" + ); + let queue: Option<dispatch_queue_t>; + let mut res: Option<B> = None; + let cex: Option<(*mut c_void, dispatch_function_t)>; + { + let guard = self.queue.lock().unwrap(); + queue = Some(*guard); + let should_cancel = self.get_should_cancel(*guard); + debug_assert!( + should_cancel.is_some(), + "dispatch context should be allocated!" + ); + cex = Some(Self::create_closure_and_executor(|| { + res = Some(work()); + should_cancel + .expect("dispatch context should be allocated!") + .store(true, Ordering::SeqCst); + })); + } + let (closure, executor) = cex.unwrap(); unsafe { - dispatch_sync_f(self.0, closure, executor); + dispatch_sync_f(queue.unwrap(), closure, executor); } + res } - fn get_should_cancel(&self) -> Option<&mut AtomicBool> { + fn get_should_cancel(&self, queue: dispatch_queue_t) -> Option<&mut AtomicBool> { + if !self.owned.load(Ordering::SeqCst) { + return None; + } unsafe { - let context = dispatch_get_context( - mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0), - ) as *mut AtomicBool; + let context = + dispatch_get_context(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue)) + as *mut AtomicBool; context.as_mut() } } fn set_should_cancel(&self, context: Box<AtomicBool>) { + assert!(self.owned.load(Ordering::SeqCst)); unsafe { - let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(self.0); + let guard = self.queue.lock().unwrap(); + let queue = mem::transmute::<dispatch_queue_t, dispatch_object_t>(*guard); // Leak the context from Box. dispatch_set_context(queue, Box::into_raw(context) as *mut c_void); @@ -106,13 +238,13 @@ impl Queue { } fn release(&self) { + let guard = self.queue.lock().unwrap(); + let queue = *guard; unsafe { // This will release the inner `dispatch_queue_t` asynchronously. // TODO: It's incredibly unsafe to call `transmute` directly. // Find another way to release the queue. - dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>( - self.0, - )); + dispatch_release(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue)); } } @@ -143,23 +275,35 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { - self.release(); + if self.owned.load(Ordering::SeqCst) { + self.release(); + } } } impl Clone for Queue { fn clone(&self) -> Self { + assert!( + self.owned.load(Ordering::SeqCst), + "No need to clone a static queue" + ); + let guard = self.queue.lock().unwrap(); + let queue = *guard; // TODO: It's incredibly unsafe to call `transmute` directly. // Find another way to release the queue. unsafe { - dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>( - self.0, - )); + dispatch_retain(mem::transmute::<dispatch_queue_t, dispatch_object_t>(queue)); + } + Self { + queue: Mutex::new(queue), + owned: AtomicBool::new(true), } - Self(self.0) } } +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + #[test] fn run_tasks_in_order() { let mut visited = Vec::<u32>::new(); @@ -176,12 +320,12 @@ fn run_tasks_in_order() { let queue = Queue::new("Run tasks in order"); - queue.run_sync(move || visit(1, ptr)); - queue.run_sync(move || visit(2, ptr)); - queue.run_async(move || visit(3, ptr)); - queue.run_async(move || visit(4, ptr)); + queue.run_sync(|| visit(1, ptr)); + queue.run_sync(|| visit(2, ptr)); + queue.run_async(|| visit(3, ptr)); + queue.run_async(|| visit(4, ptr)); // Call sync here to block the current thread and make sure all the tasks are done. - queue.run_sync(move || visit(5, ptr)); + queue.run_sync(|| visit(5, ptr)); assert_eq!(visited, vec![1, 2, 3, 4, 5]); } @@ -203,14 +347,52 @@ fn run_final_task() { let queue = Queue::new("Task after run_final will be cancelled"); - queue.run_sync(move || visit(1, ptr)); - queue.run_async(move || visit(2, ptr)); - queue.run_final(move || visit(3, ptr)); - queue.run_async(move || visit(4, ptr)); - queue.run_sync(move || visit(5, ptr)); + queue.run_sync(|| visit(1, ptr)); + queue.run_async(|| visit(2, ptr)); + queue.run_final(|| visit(3, ptr)); + queue.run_async(|| visit(4, ptr)); + queue.run_sync(|| visit(5, ptr)); } // `queue` will be dropped asynchronously and then the `finalizer` of the `queue` // should be fired to clean up the `context` set in the `queue`. assert_eq!(visited, vec![1, 2, 3]); } + +#[test] +fn sync_return_value() { + let q = Queue::new("Test queue"); + assert_eq!(q.run_sync(|| 42), Some(42)); + assert_eq!(q.run_final(|| "foo"), Some("foo")); + assert_eq!(q.run_sync(|| Ok::<(), u32>(())), None); +} + +#[test] +fn run_after() { + let mut visited = Vec::<u32>::new(); + + { + // Rust compilter doesn't allow a pointer to be passed across threads. + // A hacky way to do that is to cast the pointer into a value, then + // the value, which is actually an address, can be copied into threads. + let ptr = &mut visited as *mut Vec<u32> as usize; + + fn visit(v: u32, visited_ptr: usize) { + let visited = unsafe { &mut *(visited_ptr as *mut Vec<u32>) }; + visited.push(v); + } + + let queue = Queue::new("Task after run_final will be cancelled"); + + queue.run_async(|| visit(1, ptr)); + queue.run_after(Instant::now() + Duration::from_millis(10), || visit(2, ptr)); + queue.run_after(Instant::now() + Duration::from_secs(1), || visit(3, ptr)); + queue.run_async(|| visit(4, ptr)); + thread::sleep(Duration::from_millis(100)); + queue.run_final(|| visit(5, ptr)); + } + // `queue` will be dropped asynchronously and then the `finalizer` of the `queue` + // should be fired to clean up the `context` set in the `queue`. + + assert_eq!(visited, vec![1, 4, 2, 5]); +} |