diff options
Diffstat (limited to 'third_party/rust/cubeb-coreaudio/src/backend/mod.rs')
-rw-r--r-- | third_party/rust/cubeb-coreaudio/src/backend/mod.rs | 834 |
1 files changed, 674 insertions, 160 deletions
diff --git a/third_party/rust/cubeb-coreaudio/src/backend/mod.rs b/third_party/rust/cubeb-coreaudio/src/backend/mod.rs index 61ae44fea1..e6be028a2e 100644 --- a/third_party/rust/cubeb-coreaudio/src/backend/mod.rs +++ b/third_party/rust/cubeb-coreaudio/src/backend/mod.rs @@ -46,15 +46,15 @@ use std::mem; use std::os::raw::{c_uint, c_void}; use std::ptr; use std::slice; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex}; -use std::time::Duration; +use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak}; +use std::time::{Duration, Instant}; const NO_ERR: OSStatus = 0; const AU_OUT_BUS: AudioUnitElement = 0; const AU_IN_BUS: AudioUnitElement = 1; -const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb"; const PRIVATE_AGGREGATE_DEVICE_NAME: &str = "CubebAggregateDevice"; const VOICEPROCESSING_AGGREGATE_DEVICE_NAME: &str = "VPAUAggregateAudioDevice"; @@ -65,6 +65,34 @@ const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114"; const SAFE_MIN_LATENCY_FRAMES: u32 = 128; const SAFE_MAX_LATENCY_FRAMES: u32 = 512; +const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10); + +const MACOS_KERNEL_MAJOR_VERSION_MONTEREY: u32 = 21; + +#[derive(Debug, PartialEq)] +enum ParseMacOSKernelVersionError { + SysCtl, + Malformed, + Parsing, +} + +fn macos_kernel_major_version() -> std::result::Result<u32, ParseMacOSKernelVersionError> { + let ver = whatsys::kernel_version(); + if ver.is_none() { + return Err(ParseMacOSKernelVersionError::SysCtl); + } + let ver = ver.unwrap(); + let major = ver.split('.').next(); + if major.is_none() { + return Err(ParseMacOSKernelVersionError::Malformed); + } + let parsed_major = u32::from_str(major.unwrap()); + if parsed_major.is_err() { + return Err(ParseMacOSKernelVersionError::Parsing); + } + Ok(parsed_major.unwrap()) +} + bitflags! { #[allow(non_camel_case_types)] #[derive(Clone, Debug, PartialEq, Copy)] @@ -191,6 +219,7 @@ fn set_notification_runloop() { fn create_device_info(devid: AudioDeviceID, devtype: DeviceType) -> Option<device_info> { assert_ne!(devid, kAudioObjectSystemObject); + debug_assert_running_serially(); let mut flags = match devtype { DeviceType::INPUT => device_flags::DEV_INPUT, @@ -457,7 +486,6 @@ extern "C" fn audiounit_input_callback( assert!(!user_ptr.is_null()); let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) }; - let using_voice_processing_unit = stm.core_stream_data.using_voice_processing_unit(); if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 { let now = unsafe { mach_absolute_time() }; @@ -597,17 +625,20 @@ extern "C" fn audiounit_input_callback( ErrorHandle::Return(NO_ERR) }; - // If the input (input-only stream) or the output is drained (duplex stream), - // cancel this callback. Note that for voice processing cases (a single unit), - // the output callback handles stopping the unit and notifying of state. - if !using_voice_processing_unit && stm.draining.load(Ordering::SeqCst) { - let r = stop_audiounit(stm.core_stream_data.input_unit); - assert!(r.is_ok()); - // Only fire state-changed callback for input-only stream. - // The state-changed callback for the duplex stream is fired in the output callback. - if stm.core_stream_data.output_unit.is_null() { - stm.notify_state_changed(State::Drained); - } + // If the input (input-only stream) is drained, cancel this callback. Whenever an output + // is involved, the output callback handles stopping all units and notifying of state. + if stm.core_stream_data.output_unit.is_null() && stm.draining.load(Ordering::SeqCst) { + stm.stopped.store(true, Ordering::SeqCst); + cubeb_alog!("({:p}) Input-only drained.", stm as *const AudioUnitStream); + stm.notify_state_changed(State::Drained); + let queue = stm.queue.clone(); + // Use a new thread, through the queue, to avoid deadlock when calling + // AudioOutputUnitStop method from inside render callback + let stm_ptr = user_ptr as usize; + queue.run_async(move || { + let stm = unsafe { &mut *(stm_ptr as *mut AudioUnitStream) }; + stm.core_stream_data.stop_audiounits(); + }); } match handle { @@ -681,11 +712,17 @@ extern "C" fn audiounit_output_callback( } if stm.draining.load(Ordering::SeqCst) { - // Cancel the output callback only. For duplex stream, - // the input callback will be cancelled in its own callback. - let r = stop_audiounit(stm.core_stream_data.output_unit); - assert!(r.is_ok()); + // Cancel all callbacks. For input-only streams, the input callback handles + // cancelling itself. + stm.stopped.store(true, Ordering::SeqCst); + cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream); stm.notify_state_changed(State::Drained); + let queue = stm.queue.clone(); + // Use a new thread, through the queue, to avoid deadlock when calling + // AudioOutputUnitStop method from inside render callback + queue.run_async(move || { + stm.core_stream_data.stop_audiounits(); + }); audiounit_make_silent(&buffers[0]); return NO_ERR; } @@ -1157,38 +1194,56 @@ fn create_audiounit(device: &device_info) -> Result<AudioUnit> { Ok(unit) } -fn create_voiceprocessing_audiounit( +fn get_voiceprocessing_audiounit( + shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager, in_device: &device_info, out_device: &device_info, -) -> Result<AudioUnit> { +) -> Result<OwningHandle<VoiceProcessingUnit>> { + debug_assert_running_serially(); assert!(in_device.flags.contains(device_flags::DEV_INPUT)); assert!(!in_device.flags.contains(device_flags::DEV_OUTPUT)); assert!(!out_device.flags.contains(device_flags::DEV_INPUT)); - assert!(out_device.flags.contains(device_flags::DEV_OUTPUT)); - - let unit = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO)?; - if let Err(e) = set_device_to_audiounit(unit, in_device.id, AU_IN_BUS) { + let unit_handle = shared_voice_processing_unit.take_or_create(); + if let Err(e) = unit_handle { cubeb_log!( - "Failed to set in device {} to the created audiounit. Error: {}", - in_device.id, + "Failed to create shared voiceprocessing audiounit. Error: {}", e ); - dispose_audio_unit(unit); return Err(Error::error()); } + let mut unit_handle = unit_handle.unwrap(); - if let Err(e) = set_device_to_audiounit(unit, out_device.id, AU_OUT_BUS) { + if let Err(e) = set_device_to_audiounit(unit_handle.as_mut().unit, in_device.id, AU_IN_BUS) { cubeb_log!( - "Failed to set out device {} to the created audiounit. Error: {}", - out_device.id, + "Failed to set in device {} to the created audiounit. Error: {}", + in_device.id, e ); - dispose_audio_unit(unit); return Err(Error::error()); } - Ok(unit) + let has_output = out_device.id != kAudioObjectUnknown; + if let Err(e) = + enable_audiounit_scope(unit_handle.as_mut().unit, DeviceType::OUTPUT, has_output) + { + cubeb_log!("Failed to enable audiounit input scope. Error: {}", e); + return Err(Error::error()); + } + if has_output { + if let Err(e) = + set_device_to_audiounit(unit_handle.as_mut().unit, out_device.id, AU_OUT_BUS) + { + cubeb_log!( + "Failed to set out device {} to the created audiounit. Error: {}", + out_device.id, + e + ); + return Err(Error::error()); + } + } + + Ok(unit_handle) } fn enable_audiounit_scope( @@ -1276,6 +1331,31 @@ fn create_blank_audiounit() -> Result<AudioUnit> { return create_typed_audiounit(kAudioUnitSubType_RemoteIO); } +fn create_voiceprocessing_audiounit() -> Result<VoiceProcessingUnit> { + let res = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO); + if res.is_err() { + return Err(Error::error()); + } + + match get_default_device(DeviceType::OUTPUT) { + None => { + cubeb_log!("Could not get default output device in order to undo vpio ducking"); + } + Some(id) => { + let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5); + if r != NO_ERR { + cubeb_log!( + "Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}", + id, + r + ); + } + } + }; + + res.map(|unit| VoiceProcessingUnit { unit }) +} + fn get_buffer_size(unit: AudioUnit, devtype: DeviceType) -> std::result::Result<u32, OSStatus> { assert!(!unit.is_null()); let (scope, element) = match devtype { @@ -1558,7 +1638,7 @@ fn get_range_of_sample_rates( if rates.is_empty() { return Err(String::from("No data")); } - let (mut min, mut max) = (std::f64::MAX, std::f64::MIN); + let (mut min, mut max) = (f64::MAX, f64::MIN); for rate in rates { if rate.mMaximum > max { max = rate.mMaximum; @@ -2056,7 +2136,7 @@ struct LatencyController { } impl LatencyController { - fn add_stream(&mut self, latency: u32) -> Option<u32> { + fn add_stream(&mut self, latency: u32) -> u32 { self.streams += 1; // For the 1st stream set anything within safe min-max if self.streams == 1 { @@ -2065,16 +2145,322 @@ impl LatencyController { // synthetize the clock from the callbacks, and we want the clock to update often. self.latency = Some(latency.clamp(SAFE_MIN_LATENCY_FRAMES, SAFE_MAX_LATENCY_FRAMES)); } - self.latency + self.latency.unwrap_or(latency) } - fn subtract_stream(&mut self) -> Option<u32> { + fn subtract_stream(&mut self) { self.streams -= 1; if self.streams == 0 { assert!(self.latency.is_some()); self.latency = None; } - self.latency + } +} + +// SharedStorage<T> below looks generic but has evolved to be pretty tailored +// the observed behavior of VoiceProcessingIO audio units on macOS 14. +// Some key points are: +// - Creating the first VoiceProcessingIO unit in a process takes a long time, often > 3s. +// - Creating a second VoiceProcessingIO unit in a process is significantly faster, < 1s. +// - Disposing of a VoiceProcessingIO unit when all other VoiceProcessingIO units are +// uninitialized will take significantly longer than disposing the remaining +// VoiceProcessingIO units, and will have other side effects: starting another +// VoiceProcessingIO unit after this is on par with creating the first one in the +// process, bluetooth devices will move away from the handsfree profile, etc. +// The takeaway is that there is something internal to the VoiceProcessingIO audio unit +// that is costly to create and dispose of and its creation is triggered by creation of +// the first VoiceProcessingIO unit, and its disposal is triggered by the disposal of +// the first VoiceProcessingIO unit when no other VoiceProcessingIO units are initialized. +// +// The intended behavior of SharedStorage<T> and SharedVoiceProcessingUnitManager is therefore: +// - Retain ideally just one VoiceProcessingIO unit after stream destruction, so device +// switching is fast. The benefit of retaining more than one is unclear. +// - Dispose of either all VoiceProcessingIO units, or none at all, such that the retained +// VoiceProcessingIO unit really helps speed up creating and starting the next. In practice +// this means we retain all VoiceProcessingIO units until they can all be disposed of. + +#[derive(Debug)] +struct SharedStorageInternal<T> { + // Storage for shared elements. + elements: Vec<T>, + // Number of elements in use, i.e. all elements created/taken and not recycled. + outstanding_element_count: usize, + // Used for invalidation of in-flight tasks to clear elements. + // Incremented when something takes a shared element. + generation: usize, +} + +#[derive(Debug)] +struct SharedStorage<T> { + queue: Queue, + idle_timeout: Duration, + storage: Mutex<SharedStorageInternal<T>>, +} + +impl<T: Send> SharedStorage<T> { + fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self { + Self { + queue, + idle_timeout, + storage: Mutex::new(SharedStorageInternal::<T> { + elements: Vec::default(), + outstanding_element_count: 0, + generation: 0, + }), + } + } + + fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) -> Result<T> { + if let Some(e) = guard.elements.pop() { + cubeb_log!("Taking shared element #{}.", guard.elements.len()); + guard.outstanding_element_count += 1; + guard.generation += 1; + return Ok(e); + } + + Err(Error::not_supported()) + } + + fn create_with_locked<F>( + guard: &mut MutexGuard<'_, SharedStorageInternal<T>>, + f: F, + ) -> Result<T> + where + F: FnOnce() -> Result<T>, + { + let start = Instant::now(); + match f() { + Ok(obj) => { + cubeb_log!( + "Just created shared element #{}. Took {}s.", + guard.outstanding_element_count, + (Instant::now() - start).as_secs_f32() + ); + guard.outstanding_element_count += 1; + guard.generation += 1; + Ok(obj) + } + Err(_) => { + cubeb_log!("Creating shared element failed"); + Err(Error::error()) + } + } + } + + #[cfg(test)] + fn take(&self) -> Result<T> { + let mut guard = self.storage.lock().unwrap(); + SharedStorage::take_locked(&mut guard) + } + + fn take_or_create_with<F>(&self, f: F) -> Result<T> + where + F: FnOnce() -> Result<T>, + { + let mut guard = self.storage.lock().unwrap(); + SharedStorage::take_locked(&mut guard) + .or_else(|_| SharedStorage::create_with_locked(&mut guard, f)) + } + + fn recycle(&self, obj: T) { + let mut guard = self.storage.lock().unwrap(); + guard.outstanding_element_count -= 1; + cubeb_log!( + "Recycling shared element #{}. Nr of live elements now {}.", + guard.elements.len(), + guard.outstanding_element_count + ); + guard.elements.push(obj); + } + + fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) { + let count = guard.elements.len(); + let start = Instant::now(); + guard.elements.clear(); + cubeb_log!( + "Cleared {} shared element{}. Took {}s.", + count, + if count == 1 { "" } else { "s" }, + (Instant::now() - start).as_secs_f32() + ); + } + + fn clear(&self) { + debug_assert_running_serially(); + let mut guard = self.storage.lock().unwrap(); + SharedStorage::clear_locked(&mut guard); + } + + fn clear_if_all_idle_async(storage: &Arc<SharedStorage<T>>) { + let (queue, outstanding_element_count, generation) = { + let guard = storage.storage.lock().unwrap(); + ( + storage.queue.clone(), + guard.outstanding_element_count, + guard.generation, + ) + }; + if outstanding_element_count > 0 { + cubeb_log!( + "Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.", + outstanding_element_count, + generation + ); + return; + } + cubeb_log!( + "Clearing shared voiceprocessing unit storage in {}s if still at generation {}.", + storage.idle_timeout.as_secs_f32(), + generation + ); + let storage = storage.clone(); + queue.run_after(Instant::now() + storage.idle_timeout, move || { + let mut guard = storage.storage.lock().unwrap(); + if generation != guard.generation { + cubeb_log!( + "Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.", + generation, + guard.generation + ); + return; + } + SharedStorage::clear_locked(&mut guard); + }); + } +} + +#[derive(Debug)] +struct OwningHandle<T> +where + T: Send, +{ + storage: Weak<SharedStorage<T>>, + obj: Option<T>, +} + +impl<T: Send> OwningHandle<T> { + fn new(storage: Weak<SharedStorage<T>>, obj: T) -> Self { + Self { + storage, + obj: Some(obj), + } + } +} + +impl<T: Send> AsRef<T> for OwningHandle<T> { + fn as_ref(&self) -> &T { + self.obj.as_ref().unwrap() + } +} + +impl<T: Send> AsMut<T> for OwningHandle<T> { + fn as_mut(&mut self) -> &mut T { + self.obj.as_mut().unwrap() + } +} + +impl<T: Send> Drop for OwningHandle<T> { + fn drop(&mut self) { + let storage = self.storage.upgrade(); + assert!( + storage.is_some(), + "Storage must outlive the handle, but didn't" + ); + let storage = storage.unwrap(); + if self.obj.is_none() { + return; + } + let obj = self.obj.take().unwrap(); + storage.recycle(obj); + SharedStorage::clear_if_all_idle_async(&storage); + } +} + +#[derive(Debug)] +struct VoiceProcessingUnit { + unit: AudioUnit, +} + +impl Drop for VoiceProcessingUnit { + fn drop(&mut self) { + assert!(!self.unit.is_null()); + dispose_audio_unit(self.unit); + } +} + +unsafe impl Send for VoiceProcessingUnit {} + +#[derive(Debug)] +struct SharedVoiceProcessingUnitManager { + sync_storage: Mutex<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>, + queue: Queue, + idle_timeout: Duration, +} + +impl SharedVoiceProcessingUnitManager { + fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self { + Self { + sync_storage: Mutex::new(None), + queue, + idle_timeout, + } + } + + fn new(queue: Queue) -> Self { + SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT) + } + + fn ensure_storage_locked( + &self, + guard: &mut MutexGuard<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>, + ) { + if guard.is_some() { + return; + } + cubeb_log!("Creating shared voiceprocessing storage."); + let storage = SharedStorage::<VoiceProcessingUnit>::with_idle_timeout( + self.queue.clone(), + self.idle_timeout, + ); + let old_storage = guard.replace(Arc::from(storage)); + assert!(old_storage.is_none()); + } + + // Take an already existing, shared, vpio unit, if one is available. + #[cfg(test)] + fn take(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> { + debug_assert_running_serially(); + let mut guard = self.sync_storage.lock().unwrap(); + self.ensure_storage_locked(&mut guard); + let storage = guard.as_mut().unwrap(); + let res = storage.take(); + res.map(|u| OwningHandle::new(Arc::downgrade(storage), u)) + } + + // Take an already existing, shared, vpio unit, or create one if none are available. + fn take_or_create(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> { + debug_assert_running_serially(); + let mut guard = self.sync_storage.lock().unwrap(); + self.ensure_storage_locked(&mut guard); + let storage = guard.as_mut().unwrap(); + let res = storage.take_or_create_with(create_voiceprocessing_audiounit); + res.map(|u| OwningHandle::new(Arc::downgrade(storage), u)) + } +} + +unsafe impl Send for SharedVoiceProcessingUnitManager {} +unsafe impl Sync for SharedVoiceProcessingUnitManager {} + +impl Drop for SharedVoiceProcessingUnitManager { + fn drop(&mut self) { + debug_assert_not_running_serially(); + self.queue.run_final(|| { + let mut guard = self.sync_storage.lock().unwrap(); + if guard.is_none() { + return; + } + guard.as_mut().unwrap().clear(); + }); } } @@ -2091,15 +2477,26 @@ pub struct AudioUnitContext { serial_queue: Queue, latency_controller: Mutex<LatencyController>, devices: Mutex<SharedDevices>, + // Storage for a context-global vpio unit. Duplex streams that need one will take this + // and return it when done. + shared_voice_processing_unit: SharedVoiceProcessingUnitManager, } impl AudioUnitContext { fn new() -> Self { + let queue_label = format!("{}.context", DISPATCH_QUEUE_LABEL); + let serial_queue = + Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton()); + let shared_vp_queue = Queue::new_with_target( + format!("{}.context.shared_vpio", DISPATCH_QUEUE_LABEL).as_str(), + &serial_queue, + ); Self { _ops: &OPS as *const _, - serial_queue: Queue::new(DISPATCH_QUEUE_LABEL), + serial_queue, latency_controller: Mutex::new(LatencyController::default()), devices: Mutex::new(SharedDevices::default()), + shared_voice_processing_unit: SharedVoiceProcessingUnitManager::new(shared_vp_queue), } } @@ -2108,14 +2505,14 @@ impl AudioUnitContext { controller.streams } - fn update_latency_by_adding_stream(&self, latency_frames: u32) -> Option<u32> { + fn update_latency_by_adding_stream(&self, latency_frames: u32) -> u32 { let mut controller = self.latency_controller.lock().unwrap(); controller.add_stream(latency_frames) } - fn update_latency_by_removing_stream(&self) -> Option<u32> { + fn update_latency_by_removing_stream(&self) { let mut controller = self.latency_controller.lock().unwrap(); - controller.subtract_stream() + controller.subtract_stream(); } fn add_devices_changed_listener( @@ -2228,8 +2625,16 @@ impl AudioUnitContext { impl ContextOps for AudioUnitContext { fn init(_context_name: Option<&CStr>) -> Result<Context> { - set_notification_runloop(); - let ctx = Box::new(AudioUnitContext::new()); + run_serially(set_notification_runloop); + let mut ctx = Box::new(AudioUnitContext::new()); + let queue_label = format!("{}.context.{:p}", DISPATCH_QUEUE_LABEL, ctx.as_ref()); + ctx.serial_queue = + Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton()); + let shared_vp_queue = Queue::new_with_target( + format!("{}.shared_vpio", queue_label).as_str(), + &ctx.serial_queue, + ); + ctx.shared_voice_processing_unit = SharedVoiceProcessingUnitManager::new(shared_vp_queue); Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) } @@ -2379,28 +2784,18 @@ impl ContextOps for AudioUnitContext { return Err(Error::invalid_parameter()); } - // Latency cannot change if another stream is operating in parallel. In this case - // latency is set to the other stream value. - let global_latency_frames = self - .update_latency_by_adding_stream(latency_frames) - .unwrap(); - if global_latency_frames != latency_frames { - cubeb_log!( - "Use global latency {} instead of the requested latency {}.", - global_latency_frames, - latency_frames - ); - } - let in_stm_settings = if let Some(params) = input_stream_params { - let in_device = - match create_device_info(input_device as AudioDeviceID, DeviceType::INPUT) { - None => { - cubeb_log!("Fail to create device info for input"); - return Err(Error::error()); - } - Some(d) => d, - }; + let in_device = match self + .serial_queue + .run_sync(|| create_device_info(input_device as AudioDeviceID, DeviceType::INPUT)) + .unwrap() + { + None => { + cubeb_log!("Fail to create device info for input"); + return Err(Error::error()); + } + Some(d) => d, + }; let stm_params = StreamParams::from(unsafe { *params.as_ptr() }); Some((stm_params, in_device)) } else { @@ -2408,20 +2803,34 @@ impl ContextOps for AudioUnitContext { }; let out_stm_settings = if let Some(params) = output_stream_params { - let out_device = - match create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT) { - None => { - cubeb_log!("Fail to create device info for output"); - return Err(Error::error()); - } - Some(d) => d, - }; + let out_device = match self + .serial_queue + .run_sync(|| create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT)) + .unwrap() + { + None => { + cubeb_log!("Fail to create device info for output"); + return Err(Error::error()); + } + Some(d) => d, + }; let stm_params = StreamParams::from(unsafe { *params.as_ptr() }); Some((stm_params, out_device)) } else { None }; + // Latency cannot change if another stream is operating in parallel. In this case + // latency is set to the other stream value. + let global_latency_frames = self.update_latency_by_adding_stream(latency_frames); + if global_latency_frames != latency_frames { + cubeb_log!( + "Use global latency {} instead of the requested latency {}.", + global_latency_frames, + latency_frames + ); + } + let mut boxed_stream = Box::new(AudioUnitStream::new( self, user_ptr, @@ -2431,16 +2840,25 @@ impl ContextOps for AudioUnitContext { )); // Rename the task queue to be an unique label. - let queue_label = format!("{}.{:p}", DISPATCH_QUEUE_LABEL, boxed_stream.as_ref()); - boxed_stream.queue = Queue::new(queue_label.as_str()); + let queue_label = format!( + "{}.stream.{:p}", + DISPATCH_QUEUE_LABEL, + boxed_stream.as_ref() + ); + boxed_stream.queue = Queue::new_with_target(queue_label.as_str(), &boxed_stream.queue); boxed_stream.core_stream_data = CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings); - let mut result = Ok(()); - boxed_stream.queue.clone().run_sync(|| { - result = boxed_stream.core_stream_data.setup(); - }); + let result = boxed_stream + .queue + .clone() + .run_sync(|| { + boxed_stream + .core_stream_data + .setup(&mut boxed_stream.context.shared_voice_processing_unit) + }) + .unwrap(); if let Err(r) = result { cubeb_log!( "({:p}) Could not setup the audiounit stream.", @@ -2465,25 +2883,43 @@ impl ContextOps for AudioUnitContext { if devtype == DeviceType::UNKNOWN { return Err(Error::invalid_parameter()); } - if collection_changed_callback.is_some() { - self.add_devices_changed_listener(devtype, collection_changed_callback, user_ptr) - } else { - self.remove_devices_changed_listener(devtype) - } + self.serial_queue + .clone() + .run_sync(|| { + if collection_changed_callback.is_some() { + self.add_devices_changed_listener( + devtype, + collection_changed_callback, + user_ptr, + ) + } else { + self.remove_devices_changed_listener(devtype) + } + }) + .unwrap() } } impl Drop for AudioUnitContext { fn drop(&mut self) { - let devices = self.devices.lock().unwrap(); - assert!( + assert!({ + let devices = self.devices.lock().unwrap(); devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none() - ); + }); + + self.shared_voice_processing_unit = + SharedVoiceProcessingUnitManager::new(self.serial_queue.clone()); + + // Make sure all the pending (device-collection-changed-callback) tasks + // in queue are done, and cancel all the tasks appended after `drop` is executed. + let queue = self.serial_queue.clone(); + queue.run_final(|| {}); { let controller = self.latency_controller.lock().unwrap(); - // Disabling this assert for bug 1083664 -- we seem to leak a stream + // Disabling this assert in release for bug 1083664 -- we seem to leak a stream // assert(controller.streams == 0); + debug_assert!(controller.streams == 0); if controller.streams > 0 { cubeb_log!( "({:p}) API misuse, {} streams active when context destroyed!", @@ -2492,10 +2928,6 @@ impl Drop for AudioUnitContext { ); } } - // Make sure all the pending (device-collection-changed-callback) tasks - // in queue are done, and cancel all the tasks appended after `drop` is executed. - let queue = self.serial_queue.clone(); - queue.run_final(|| {}); } } @@ -2562,6 +2994,8 @@ struct CoreStreamData<'ctx> { // I/O AudioUnits. input_unit: AudioUnit, output_unit: AudioUnit, + // Handle to shared voiceprocessing AudioUnit, if in use. + voiceprocessing_unit_handle: Option<OwningHandle<VoiceProcessingUnit>>, // Info of the I/O devices. input_device: device_info, output_device: device_info, @@ -2603,6 +3037,7 @@ impl<'ctx> Default for CoreStreamData<'ctx> { output_dev_desc: AudioStreamBasicDescription::default(), input_unit: ptr::null_mut(), output_unit: ptr::null_mut(), + voiceprocessing_unit_handle: None, input_device: device_info::default(), output_device: device_info::default(), input_processing_params: InputProcessingParams::NONE, @@ -2649,6 +3084,7 @@ impl<'ctx> CoreStreamData<'ctx> { output_dev_desc: AudioStreamBasicDescription::default(), input_unit: ptr::null_mut(), output_unit: ptr::null_mut(), + voiceprocessing_unit_handle: None, input_device: in_dev, output_device: out_dev, input_processing_params: InputProcessingParams::NONE, @@ -2716,7 +3152,7 @@ impl<'ctx> CoreStreamData<'ctx> { } fn using_voice_processing_unit(&self) -> bool { - !self.input_unit.is_null() && self.input_unit == self.output_unit + self.voiceprocessing_unit_handle.is_some() } fn same_clock_domain(&self) -> bool { @@ -2744,6 +3180,22 @@ impl<'ctx> CoreStreamData<'ctx> { input_domain == output_domain } + #[allow(non_upper_case_globals)] + fn should_force_vpio_for_input_device(&self, in_device: &device_info) -> bool { + assert!(in_device.id != kAudioObjectUnknown); + self.debug_assert_is_on_stream_queue(); + match get_device_transport_type(in_device.id, DeviceType::INPUT) { + Ok(kAudioDeviceTransportTypeBuiltIn) => { + cubeb_log!( + "Forcing VPIO because input device is built in, and its volume \ + is known to be very low without VPIO whenever VPIO is hooked up to it elsewhere." + ); + true + } + _ => false, + } + } + fn should_block_vpio_for_device_pair( &self, in_device: &device_info, @@ -2751,7 +3203,10 @@ impl<'ctx> CoreStreamData<'ctx> { ) -> bool { self.debug_assert_is_on_stream_queue(); cubeb_log!("Evaluating device pair against VPIO block list"); - let log_device = |id, devtype| -> std::result::Result<(), OSStatus> { + let log_device_and_get_model_uid = |id, devtype| -> String { + let device_model_uid = get_device_model_uid(id, devtype) + .map(|s| s.into_string()) + .unwrap_or_default(); cubeb_log!("{} uid=\"{}\", model_uid=\"{}\", transport_type={:?}, source={:?}, source_name=\"{}\", name=\"{}\", manufacturer=\"{}\"", if devtype == DeviceType::INPUT { "Input" @@ -2760,43 +3215,59 @@ impl<'ctx> CoreStreamData<'ctx> { "Output" }, get_device_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(), - get_device_model_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(), + device_model_uid, convert_uint32_into_string(get_device_transport_type(id, devtype).unwrap_or(0)), convert_uint32_into_string(get_device_source(id, devtype).unwrap_or(0)), get_device_source_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(), get_device_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(), get_device_manufacturer(id, devtype).map(|s| s.into_string()).unwrap_or_default()); - Ok(()) + device_model_uid }; - log_device(in_device.id, DeviceType::INPUT); - log_device(out_device.id, DeviceType::OUTPUT); - match ( - get_device_model_uid(in_device.id, DeviceType::INPUT).map(|s| s.to_string()), - get_device_model_uid(out_device.id, DeviceType::OUTPUT).map(|s| s.to_string()), - ) { - (Ok(in_model_uid), Ok(out_model_uid)) - if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID) - && out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID) => - { - cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED"); - true - } - _ => { - cubeb_log!("Device pair is not blocked"); - false - } + + #[allow(non_upper_case_globals)] + let in_id = match in_device.id { + kAudioObjectUnknown => None, + id => Some(id), + }; + #[allow(non_upper_case_globals)] + let out_id = match out_device.id { + kAudioObjectUnknown => None, + id => Some(id), + }; + + let (in_model_uid, out_model_uid) = ( + in_id + .map(|id| log_device_and_get_model_uid(id, DeviceType::INPUT)) + .unwrap_or_default(), + out_id + .map(|id| log_device_and_get_model_uid(id, DeviceType::OUTPUT)) + .unwrap_or_default(), + ); + + if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID) + && out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID) + { + cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED"); + return true; } + + cubeb_log!("Device pair is not blocked"); + false } - fn create_audiounits(&mut self) -> Result<(device_info, device_info)> { + fn create_audiounits( + &mut self, + shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager, + ) -> Result<(device_info, device_info)> { self.debug_assert_is_on_stream_queue(); let should_use_voice_processing_unit = self.has_input() - && self.has_output() - && self + && (self .input_stream_params .prefs() .contains(StreamPrefs::VOICE) - && !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device); + || self.should_force_vpio_for_input_device(&self.input_device)) + && !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device) + && macos_kernel_major_version() != Ok(MACOS_KERNEL_MAJOR_VERSION_MONTEREY); let should_use_aggregate_device = { // It's impossible to create an aggregate device from an aggregate device, and it's @@ -2843,16 +3314,20 @@ impl<'ctx> CoreStreamData<'ctx> { // - As last resort, create regular AudioUnits. This is also the normal non-duplex path. if should_use_voice_processing_unit { - if let Ok(au) = - create_voiceprocessing_audiounit(&self.input_device, &self.output_device) - { - cubeb_log!("({:p}) Using VoiceProcessingIO AudioUnit", self.stm_ptr); - self.input_unit = au; - self.output_unit = au; + if let Ok(mut au_handle) = get_voiceprocessing_audiounit( + shared_voice_processing_unit, + &self.input_device, + &self.output_device, + ) { + self.input_unit = au_handle.as_mut().unit; + if self.has_output() { + self.output_unit = au_handle.as_mut().unit; + } + self.voiceprocessing_unit_handle = Some(au_handle); return Ok((self.input_device.clone(), self.output_device.clone())); } cubeb_log!( - "({:p}) Failed to create VoiceProcessingIO AudioUnit. Trying a regular one.", + "({:p}) Failed to get VoiceProcessingIO AudioUnit. Trying a regular one.", self.stm_ptr ); } @@ -2954,7 +3429,10 @@ impl<'ctx> CoreStreamData<'ctx> { } #[allow(clippy::cognitive_complexity)] // TODO: Refactoring. - fn setup(&mut self) -> Result<()> { + fn setup( + &mut self, + shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager, + ) -> Result<()> { self.debug_assert_is_on_stream_queue(); if self .input_stream_params @@ -2970,7 +3448,7 @@ impl<'ctx> CoreStreamData<'ctx> { } let same_clock_domain = self.same_clock_domain(); - let (in_dev_info, out_dev_info) = self.create_audiounits()?; + let (in_dev_info, out_dev_info) = self.create_audiounits(shared_voice_processing_unit)?; let using_voice_processing_unit = self.using_voice_processing_unit(); assert!(!self.stm_ptr.is_null()); @@ -3155,6 +3633,25 @@ impl<'ctx> CoreStreamData<'ctx> { ); } + if self.has_input() && !self.has_output() && using_voice_processing_unit { + // We must configure the output side of VPIO to match the input side, even if we don't use it. + let r = audio_unit_set_property( + self.input_unit, + kAudioUnitProperty_StreamFormat, + kAudioUnitScope_Input, + AU_OUT_BUS, + &self.input_dev_desc, + mem::size_of::<AudioStreamBasicDescription>(), + ); + if r != NO_ERR { + cubeb_log!( + "AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}", + r + ); + return Err(Error::error()); + } + } + if self.has_output() { assert!(!self.output_unit.is_null()); @@ -3463,15 +3960,31 @@ impl<'ctx> CoreStreamData<'ctx> { // NOTE: On MacOS 14 the ducking happens on creation of the VPIO AudioUnit. // On MacOS 10.15 it happens on both creation and initialization, which // is why we defer the unducking until now. - let r = audio_device_duck(self.output_device.id, 1.0, ptr::null_mut(), 0.5); - if r != NO_ERR { - cubeb_log!( - "({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}", - self.stm_ptr, - self.output_device.id, - r - ); - } + #[allow(non_upper_case_globals)] + let mut device = match self.output_device.id { + kAudioObjectUnknown => None, + id => Some(id), + }; + device = device.or_else(|| get_default_device(DeviceType::OUTPUT)); + match device { + None => { + cubeb_log!( + "({:p}) No output device to undo vpio ducking on", + self.stm_ptr + ); + } + Some(id) => { + let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5); + if r != NO_ERR { + cubeb_log!( + "({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}", + self.stm_ptr, + id, + r + ); + } + } + }; // Always try to remember the applied input mute state. If it cannot be applied // to the new device pair, we notify the client of an error and it will have to @@ -3558,10 +4071,16 @@ impl<'ctx> CoreStreamData<'ctx> { } if !self.input_unit.is_null() { - dispose_audio_unit(self.input_unit); + if !self.using_voice_processing_unit() { + // The VPIO unit is shared and must not be disposed. + dispose_audio_unit(self.input_unit); + } self.input_unit = ptr::null_mut(); } + // Return the VPIO unit if present. + self.voiceprocessing_unit_handle = None; + self.resampler.destroy(); self.mixer = None; self.aggregate_device = None; @@ -3873,8 +4392,6 @@ struct OutputCallbackTimingData { // #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitStream. #[repr(C)] #[derive(Debug)] -// Allow exposing this private struct in public interfaces when running tests. -#[cfg_attr(test, allow(private_in_public))] struct AudioUnitStream<'ctx> { context: &'ctx mut AudioUnitContext, user_ptr: *mut c_void, @@ -3927,10 +4444,11 @@ impl<'ctx> AudioUnitStream<'ctx> { }); let (output_callback_timing_data_write, output_callback_timing_data_read) = output_callback_timing_data.split(); + let queue = context.serial_queue.clone(); AudioUnitStream { context, user_ptr, - queue: Queue::new(DISPATCH_QUEUE_LABEL), + queue, data_callback, state_callback, device_changed_callback: Mutex::new(None), @@ -4046,10 +4564,12 @@ impl<'ctx> AudioUnitStream<'ctx> { } } - self.core_stream_data.setup().map_err(|e| { - cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr); - e - })?; + self.core_stream_data + .setup(&mut self.context.shared_voice_processing_unit) + .map_err(|e| { + cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr); + e + })?; if let Ok(volume) = vol_rv { set_volume(self.core_stream_data.output_unit, volume); @@ -4171,7 +4691,7 @@ impl<'ctx> AudioUnitStream<'ctx> { impl<'ctx> Drop for AudioUnitStream<'ctx> { fn drop(&mut self) { // Execute destroy in serial queue to avoid collision with reinit when un/plug devices - self.queue.clone().run_final(move || { + self.queue.clone().run_final(|| { self.destroy(); self.core_stream_data = CoreStreamData::default(); }); @@ -4184,12 +4704,10 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> { self.draining.store(false, Ordering::SeqCst); // Execute start in serial queue to avoid racing with destroy or reinit. - let mut result = Err(Error::error()); - let started = &mut result; - let stream = &self; - self.queue.run_sync(move || { - *started = stream.core_stream_data.start_audiounits(); - }); + let result = self + .queue + .run_sync(|| self.core_stream_data.start_audiounits()) + .unwrap(); result?; @@ -4205,10 +4723,8 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> { self.stopped.store(true, Ordering::SeqCst); // Execute stop in serial queue to avoid racing with destroy or reinit. - let stream = &self; - self.queue.run_sync(move || { - stream.core_stream_data.stop_audiounits(); - }); + self.queue + .run_sync(|| self.core_stream_data.stop_audiounits()); self.notify_state_changed(State::Stopped); @@ -4285,12 +4801,10 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> { } fn set_volume(&mut self, volume: f32) -> Result<()> { // Execute set_volume in serial queue to avoid racing with destroy or reinit. - let mut result = Err(Error::error()); - let set = &mut result; - let stream = &self; - self.queue.run_sync(move || { - *set = set_volume(stream.core_stream_data.output_unit, volume); - }); + let result = self + .queue + .run_sync(|| set_volume(self.core_stream_data.output_unit, volume)) + .unwrap(); result?; |