summaryrefslogtreecommitdiffstats
path: root/third_party/rust/cubeb-coreaudio/src/backend/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/cubeb-coreaudio/src/backend/mod.rs834
1 files changed, 674 insertions, 160 deletions
diff --git a/third_party/rust/cubeb-coreaudio/src/backend/mod.rs b/third_party/rust/cubeb-coreaudio/src/backend/mod.rs
index 61ae44fea1..e6be028a2e 100644
--- a/third_party/rust/cubeb-coreaudio/src/backend/mod.rs
+++ b/third_party/rust/cubeb-coreaudio/src/backend/mod.rs
@@ -46,15 +46,15 @@ use std::mem;
use std::os::raw::{c_uint, c_void};
use std::ptr;
use std::slice;
+use std::str::FromStr;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
-use std::sync::{Arc, Condvar, Mutex};
-use std::time::Duration;
+use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
+use std::time::{Duration, Instant};
const NO_ERR: OSStatus = 0;
const AU_OUT_BUS: AudioUnitElement = 0;
const AU_IN_BUS: AudioUnitElement = 1;
-const DISPATCH_QUEUE_LABEL: &str = "org.mozilla.cubeb";
const PRIVATE_AGGREGATE_DEVICE_NAME: &str = "CubebAggregateDevice";
const VOICEPROCESSING_AGGREGATE_DEVICE_NAME: &str = "VPAUAggregateAudioDevice";
@@ -65,6 +65,34 @@ const APPLE_STUDIO_DISPLAY_USB_ID: &str = "05AC:1114";
const SAFE_MIN_LATENCY_FRAMES: u32 = 128;
const SAFE_MAX_LATENCY_FRAMES: u32 = 512;
+const VPIO_IDLE_TIMEOUT: Duration = Duration::from_secs(10);
+
+const MACOS_KERNEL_MAJOR_VERSION_MONTEREY: u32 = 21;
+
+#[derive(Debug, PartialEq)]
+enum ParseMacOSKernelVersionError {
+ SysCtl,
+ Malformed,
+ Parsing,
+}
+
+fn macos_kernel_major_version() -> std::result::Result<u32, ParseMacOSKernelVersionError> {
+ let ver = whatsys::kernel_version();
+ if ver.is_none() {
+ return Err(ParseMacOSKernelVersionError::SysCtl);
+ }
+ let ver = ver.unwrap();
+ let major = ver.split('.').next();
+ if major.is_none() {
+ return Err(ParseMacOSKernelVersionError::Malformed);
+ }
+ let parsed_major = u32::from_str(major.unwrap());
+ if parsed_major.is_err() {
+ return Err(ParseMacOSKernelVersionError::Parsing);
+ }
+ Ok(parsed_major.unwrap())
+}
+
bitflags! {
#[allow(non_camel_case_types)]
#[derive(Clone, Debug, PartialEq, Copy)]
@@ -191,6 +219,7 @@ fn set_notification_runloop() {
fn create_device_info(devid: AudioDeviceID, devtype: DeviceType) -> Option<device_info> {
assert_ne!(devid, kAudioObjectSystemObject);
+ debug_assert_running_serially();
let mut flags = match devtype {
DeviceType::INPUT => device_flags::DEV_INPUT,
@@ -457,7 +486,6 @@ extern "C" fn audiounit_input_callback(
assert!(!user_ptr.is_null());
let stm = unsafe { &mut *(user_ptr as *mut AudioUnitStream) };
- let using_voice_processing_unit = stm.core_stream_data.using_voice_processing_unit();
if unsafe { *flags | kAudioTimeStampHostTimeValid } != 0 {
let now = unsafe { mach_absolute_time() };
@@ -597,17 +625,20 @@ extern "C" fn audiounit_input_callback(
ErrorHandle::Return(NO_ERR)
};
- // If the input (input-only stream) or the output is drained (duplex stream),
- // cancel this callback. Note that for voice processing cases (a single unit),
- // the output callback handles stopping the unit and notifying of state.
- if !using_voice_processing_unit && stm.draining.load(Ordering::SeqCst) {
- let r = stop_audiounit(stm.core_stream_data.input_unit);
- assert!(r.is_ok());
- // Only fire state-changed callback for input-only stream.
- // The state-changed callback for the duplex stream is fired in the output callback.
- if stm.core_stream_data.output_unit.is_null() {
- stm.notify_state_changed(State::Drained);
- }
+ // If the input (input-only stream) is drained, cancel this callback. Whenever an output
+ // is involved, the output callback handles stopping all units and notifying of state.
+ if stm.core_stream_data.output_unit.is_null() && stm.draining.load(Ordering::SeqCst) {
+ stm.stopped.store(true, Ordering::SeqCst);
+ cubeb_alog!("({:p}) Input-only drained.", stm as *const AudioUnitStream);
+ stm.notify_state_changed(State::Drained);
+ let queue = stm.queue.clone();
+ // Use a new thread, through the queue, to avoid deadlock when calling
+ // AudioOutputUnitStop method from inside render callback
+ let stm_ptr = user_ptr as usize;
+ queue.run_async(move || {
+ let stm = unsafe { &mut *(stm_ptr as *mut AudioUnitStream) };
+ stm.core_stream_data.stop_audiounits();
+ });
}
match handle {
@@ -681,11 +712,17 @@ extern "C" fn audiounit_output_callback(
}
if stm.draining.load(Ordering::SeqCst) {
- // Cancel the output callback only. For duplex stream,
- // the input callback will be cancelled in its own callback.
- let r = stop_audiounit(stm.core_stream_data.output_unit);
- assert!(r.is_ok());
+ // Cancel all callbacks. For input-only streams, the input callback handles
+ // cancelling itself.
+ stm.stopped.store(true, Ordering::SeqCst);
+ cubeb_alog!("({:p}) output drained.", stm as *const AudioUnitStream);
stm.notify_state_changed(State::Drained);
+ let queue = stm.queue.clone();
+ // Use a new thread, through the queue, to avoid deadlock when calling
+ // AudioOutputUnitStop method from inside render callback
+ queue.run_async(move || {
+ stm.core_stream_data.stop_audiounits();
+ });
audiounit_make_silent(&buffers[0]);
return NO_ERR;
}
@@ -1157,38 +1194,56 @@ fn create_audiounit(device: &device_info) -> Result<AudioUnit> {
Ok(unit)
}
-fn create_voiceprocessing_audiounit(
+fn get_voiceprocessing_audiounit(
+ shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
in_device: &device_info,
out_device: &device_info,
-) -> Result<AudioUnit> {
+) -> Result<OwningHandle<VoiceProcessingUnit>> {
+ debug_assert_running_serially();
assert!(in_device.flags.contains(device_flags::DEV_INPUT));
assert!(!in_device.flags.contains(device_flags::DEV_OUTPUT));
assert!(!out_device.flags.contains(device_flags::DEV_INPUT));
- assert!(out_device.flags.contains(device_flags::DEV_OUTPUT));
-
- let unit = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO)?;
- if let Err(e) = set_device_to_audiounit(unit, in_device.id, AU_IN_BUS) {
+ let unit_handle = shared_voice_processing_unit.take_or_create();
+ if let Err(e) = unit_handle {
cubeb_log!(
- "Failed to set in device {} to the created audiounit. Error: {}",
- in_device.id,
+ "Failed to create shared voiceprocessing audiounit. Error: {}",
e
);
- dispose_audio_unit(unit);
return Err(Error::error());
}
+ let mut unit_handle = unit_handle.unwrap();
- if let Err(e) = set_device_to_audiounit(unit, out_device.id, AU_OUT_BUS) {
+ if let Err(e) = set_device_to_audiounit(unit_handle.as_mut().unit, in_device.id, AU_IN_BUS) {
cubeb_log!(
- "Failed to set out device {} to the created audiounit. Error: {}",
- out_device.id,
+ "Failed to set in device {} to the created audiounit. Error: {}",
+ in_device.id,
e
);
- dispose_audio_unit(unit);
return Err(Error::error());
}
- Ok(unit)
+ let has_output = out_device.id != kAudioObjectUnknown;
+ if let Err(e) =
+ enable_audiounit_scope(unit_handle.as_mut().unit, DeviceType::OUTPUT, has_output)
+ {
+ cubeb_log!("Failed to enable audiounit input scope. Error: {}", e);
+ return Err(Error::error());
+ }
+ if has_output {
+ if let Err(e) =
+ set_device_to_audiounit(unit_handle.as_mut().unit, out_device.id, AU_OUT_BUS)
+ {
+ cubeb_log!(
+ "Failed to set out device {} to the created audiounit. Error: {}",
+ out_device.id,
+ e
+ );
+ return Err(Error::error());
+ }
+ }
+
+ Ok(unit_handle)
}
fn enable_audiounit_scope(
@@ -1276,6 +1331,31 @@ fn create_blank_audiounit() -> Result<AudioUnit> {
return create_typed_audiounit(kAudioUnitSubType_RemoteIO);
}
+fn create_voiceprocessing_audiounit() -> Result<VoiceProcessingUnit> {
+ let res = create_typed_audiounit(kAudioUnitSubType_VoiceProcessingIO);
+ if res.is_err() {
+ return Err(Error::error());
+ }
+
+ match get_default_device(DeviceType::OUTPUT) {
+ None => {
+ cubeb_log!("Could not get default output device in order to undo vpio ducking");
+ }
+ Some(id) => {
+ let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
+ if r != NO_ERR {
+ cubeb_log!(
+ "Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
+ id,
+ r
+ );
+ }
+ }
+ };
+
+ res.map(|unit| VoiceProcessingUnit { unit })
+}
+
fn get_buffer_size(unit: AudioUnit, devtype: DeviceType) -> std::result::Result<u32, OSStatus> {
assert!(!unit.is_null());
let (scope, element) = match devtype {
@@ -1558,7 +1638,7 @@ fn get_range_of_sample_rates(
if rates.is_empty() {
return Err(String::from("No data"));
}
- let (mut min, mut max) = (std::f64::MAX, std::f64::MIN);
+ let (mut min, mut max) = (f64::MAX, f64::MIN);
for rate in rates {
if rate.mMaximum > max {
max = rate.mMaximum;
@@ -2056,7 +2136,7 @@ struct LatencyController {
}
impl LatencyController {
- fn add_stream(&mut self, latency: u32) -> Option<u32> {
+ fn add_stream(&mut self, latency: u32) -> u32 {
self.streams += 1;
// For the 1st stream set anything within safe min-max
if self.streams == 1 {
@@ -2065,16 +2145,322 @@ impl LatencyController {
// synthetize the clock from the callbacks, and we want the clock to update often.
self.latency = Some(latency.clamp(SAFE_MIN_LATENCY_FRAMES, SAFE_MAX_LATENCY_FRAMES));
}
- self.latency
+ self.latency.unwrap_or(latency)
}
- fn subtract_stream(&mut self) -> Option<u32> {
+ fn subtract_stream(&mut self) {
self.streams -= 1;
if self.streams == 0 {
assert!(self.latency.is_some());
self.latency = None;
}
- self.latency
+ }
+}
+
+// SharedStorage<T> below looks generic but has evolved to be pretty tailored
+// the observed behavior of VoiceProcessingIO audio units on macOS 14.
+// Some key points are:
+// - Creating the first VoiceProcessingIO unit in a process takes a long time, often > 3s.
+// - Creating a second VoiceProcessingIO unit in a process is significantly faster, < 1s.
+// - Disposing of a VoiceProcessingIO unit when all other VoiceProcessingIO units are
+// uninitialized will take significantly longer than disposing the remaining
+// VoiceProcessingIO units, and will have other side effects: starting another
+// VoiceProcessingIO unit after this is on par with creating the first one in the
+// process, bluetooth devices will move away from the handsfree profile, etc.
+// The takeaway is that there is something internal to the VoiceProcessingIO audio unit
+// that is costly to create and dispose of and its creation is triggered by creation of
+// the first VoiceProcessingIO unit, and its disposal is triggered by the disposal of
+// the first VoiceProcessingIO unit when no other VoiceProcessingIO units are initialized.
+//
+// The intended behavior of SharedStorage<T> and SharedVoiceProcessingUnitManager is therefore:
+// - Retain ideally just one VoiceProcessingIO unit after stream destruction, so device
+// switching is fast. The benefit of retaining more than one is unclear.
+// - Dispose of either all VoiceProcessingIO units, or none at all, such that the retained
+// VoiceProcessingIO unit really helps speed up creating and starting the next. In practice
+// this means we retain all VoiceProcessingIO units until they can all be disposed of.
+
+#[derive(Debug)]
+struct SharedStorageInternal<T> {
+ // Storage for shared elements.
+ elements: Vec<T>,
+ // Number of elements in use, i.e. all elements created/taken and not recycled.
+ outstanding_element_count: usize,
+ // Used for invalidation of in-flight tasks to clear elements.
+ // Incremented when something takes a shared element.
+ generation: usize,
+}
+
+#[derive(Debug)]
+struct SharedStorage<T> {
+ queue: Queue,
+ idle_timeout: Duration,
+ storage: Mutex<SharedStorageInternal<T>>,
+}
+
+impl<T: Send> SharedStorage<T> {
+ fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
+ Self {
+ queue,
+ idle_timeout,
+ storage: Mutex::new(SharedStorageInternal::<T> {
+ elements: Vec::default(),
+ outstanding_element_count: 0,
+ generation: 0,
+ }),
+ }
+ }
+
+ fn take_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) -> Result<T> {
+ if let Some(e) = guard.elements.pop() {
+ cubeb_log!("Taking shared element #{}.", guard.elements.len());
+ guard.outstanding_element_count += 1;
+ guard.generation += 1;
+ return Ok(e);
+ }
+
+ Err(Error::not_supported())
+ }
+
+ fn create_with_locked<F>(
+ guard: &mut MutexGuard<'_, SharedStorageInternal<T>>,
+ f: F,
+ ) -> Result<T>
+ where
+ F: FnOnce() -> Result<T>,
+ {
+ let start = Instant::now();
+ match f() {
+ Ok(obj) => {
+ cubeb_log!(
+ "Just created shared element #{}. Took {}s.",
+ guard.outstanding_element_count,
+ (Instant::now() - start).as_secs_f32()
+ );
+ guard.outstanding_element_count += 1;
+ guard.generation += 1;
+ Ok(obj)
+ }
+ Err(_) => {
+ cubeb_log!("Creating shared element failed");
+ Err(Error::error())
+ }
+ }
+ }
+
+ #[cfg(test)]
+ fn take(&self) -> Result<T> {
+ let mut guard = self.storage.lock().unwrap();
+ SharedStorage::take_locked(&mut guard)
+ }
+
+ fn take_or_create_with<F>(&self, f: F) -> Result<T>
+ where
+ F: FnOnce() -> Result<T>,
+ {
+ let mut guard = self.storage.lock().unwrap();
+ SharedStorage::take_locked(&mut guard)
+ .or_else(|_| SharedStorage::create_with_locked(&mut guard, f))
+ }
+
+ fn recycle(&self, obj: T) {
+ let mut guard = self.storage.lock().unwrap();
+ guard.outstanding_element_count -= 1;
+ cubeb_log!(
+ "Recycling shared element #{}. Nr of live elements now {}.",
+ guard.elements.len(),
+ guard.outstanding_element_count
+ );
+ guard.elements.push(obj);
+ }
+
+ fn clear_locked(guard: &mut MutexGuard<'_, SharedStorageInternal<T>>) {
+ let count = guard.elements.len();
+ let start = Instant::now();
+ guard.elements.clear();
+ cubeb_log!(
+ "Cleared {} shared element{}. Took {}s.",
+ count,
+ if count == 1 { "" } else { "s" },
+ (Instant::now() - start).as_secs_f32()
+ );
+ }
+
+ fn clear(&self) {
+ debug_assert_running_serially();
+ let mut guard = self.storage.lock().unwrap();
+ SharedStorage::clear_locked(&mut guard);
+ }
+
+ fn clear_if_all_idle_async(storage: &Arc<SharedStorage<T>>) {
+ let (queue, outstanding_element_count, generation) = {
+ let guard = storage.storage.lock().unwrap();
+ (
+ storage.queue.clone(),
+ guard.outstanding_element_count,
+ guard.generation,
+ )
+ };
+ if outstanding_element_count > 0 {
+ cubeb_log!(
+ "Not clearing shared voiceprocessing unit storage because {} elements are in use. Generation={}.",
+ outstanding_element_count,
+ generation
+ );
+ return;
+ }
+ cubeb_log!(
+ "Clearing shared voiceprocessing unit storage in {}s if still at generation {}.",
+ storage.idle_timeout.as_secs_f32(),
+ generation
+ );
+ let storage = storage.clone();
+ queue.run_after(Instant::now() + storage.idle_timeout, move || {
+ let mut guard = storage.storage.lock().unwrap();
+ if generation != guard.generation {
+ cubeb_log!(
+ "Not clearing shared voiceprocessing unit storage for generation {} as we're now at {}.",
+ generation,
+ guard.generation
+ );
+ return;
+ }
+ SharedStorage::clear_locked(&mut guard);
+ });
+ }
+}
+
+#[derive(Debug)]
+struct OwningHandle<T>
+where
+ T: Send,
+{
+ storage: Weak<SharedStorage<T>>,
+ obj: Option<T>,
+}
+
+impl<T: Send> OwningHandle<T> {
+ fn new(storage: Weak<SharedStorage<T>>, obj: T) -> Self {
+ Self {
+ storage,
+ obj: Some(obj),
+ }
+ }
+}
+
+impl<T: Send> AsRef<T> for OwningHandle<T> {
+ fn as_ref(&self) -> &T {
+ self.obj.as_ref().unwrap()
+ }
+}
+
+impl<T: Send> AsMut<T> for OwningHandle<T> {
+ fn as_mut(&mut self) -> &mut T {
+ self.obj.as_mut().unwrap()
+ }
+}
+
+impl<T: Send> Drop for OwningHandle<T> {
+ fn drop(&mut self) {
+ let storage = self.storage.upgrade();
+ assert!(
+ storage.is_some(),
+ "Storage must outlive the handle, but didn't"
+ );
+ let storage = storage.unwrap();
+ if self.obj.is_none() {
+ return;
+ }
+ let obj = self.obj.take().unwrap();
+ storage.recycle(obj);
+ SharedStorage::clear_if_all_idle_async(&storage);
+ }
+}
+
+#[derive(Debug)]
+struct VoiceProcessingUnit {
+ unit: AudioUnit,
+}
+
+impl Drop for VoiceProcessingUnit {
+ fn drop(&mut self) {
+ assert!(!self.unit.is_null());
+ dispose_audio_unit(self.unit);
+ }
+}
+
+unsafe impl Send for VoiceProcessingUnit {}
+
+#[derive(Debug)]
+struct SharedVoiceProcessingUnitManager {
+ sync_storage: Mutex<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
+ queue: Queue,
+ idle_timeout: Duration,
+}
+
+impl SharedVoiceProcessingUnitManager {
+ fn with_idle_timeout(queue: Queue, idle_timeout: Duration) -> Self {
+ Self {
+ sync_storage: Mutex::new(None),
+ queue,
+ idle_timeout,
+ }
+ }
+
+ fn new(queue: Queue) -> Self {
+ SharedVoiceProcessingUnitManager::with_idle_timeout(queue, VPIO_IDLE_TIMEOUT)
+ }
+
+ fn ensure_storage_locked(
+ &self,
+ guard: &mut MutexGuard<Option<Arc<SharedStorage<VoiceProcessingUnit>>>>,
+ ) {
+ if guard.is_some() {
+ return;
+ }
+ cubeb_log!("Creating shared voiceprocessing storage.");
+ let storage = SharedStorage::<VoiceProcessingUnit>::with_idle_timeout(
+ self.queue.clone(),
+ self.idle_timeout,
+ );
+ let old_storage = guard.replace(Arc::from(storage));
+ assert!(old_storage.is_none());
+ }
+
+ // Take an already existing, shared, vpio unit, if one is available.
+ #[cfg(test)]
+ fn take(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
+ debug_assert_running_serially();
+ let mut guard = self.sync_storage.lock().unwrap();
+ self.ensure_storage_locked(&mut guard);
+ let storage = guard.as_mut().unwrap();
+ let res = storage.take();
+ res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
+ }
+
+ // Take an already existing, shared, vpio unit, or create one if none are available.
+ fn take_or_create(&mut self) -> Result<OwningHandle<VoiceProcessingUnit>> {
+ debug_assert_running_serially();
+ let mut guard = self.sync_storage.lock().unwrap();
+ self.ensure_storage_locked(&mut guard);
+ let storage = guard.as_mut().unwrap();
+ let res = storage.take_or_create_with(create_voiceprocessing_audiounit);
+ res.map(|u| OwningHandle::new(Arc::downgrade(storage), u))
+ }
+}
+
+unsafe impl Send for SharedVoiceProcessingUnitManager {}
+unsafe impl Sync for SharedVoiceProcessingUnitManager {}
+
+impl Drop for SharedVoiceProcessingUnitManager {
+ fn drop(&mut self) {
+ debug_assert_not_running_serially();
+ self.queue.run_final(|| {
+ let mut guard = self.sync_storage.lock().unwrap();
+ if guard.is_none() {
+ return;
+ }
+ guard.as_mut().unwrap().clear();
+ });
}
}
@@ -2091,15 +2477,26 @@ pub struct AudioUnitContext {
serial_queue: Queue,
latency_controller: Mutex<LatencyController>,
devices: Mutex<SharedDevices>,
+ // Storage for a context-global vpio unit. Duplex streams that need one will take this
+ // and return it when done.
+ shared_voice_processing_unit: SharedVoiceProcessingUnitManager,
}
impl AudioUnitContext {
fn new() -> Self {
+ let queue_label = format!("{}.context", DISPATCH_QUEUE_LABEL);
+ let serial_queue =
+ Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
+ let shared_vp_queue = Queue::new_with_target(
+ format!("{}.context.shared_vpio", DISPATCH_QUEUE_LABEL).as_str(),
+ &serial_queue,
+ );
Self {
_ops: &OPS as *const _,
- serial_queue: Queue::new(DISPATCH_QUEUE_LABEL),
+ serial_queue,
latency_controller: Mutex::new(LatencyController::default()),
devices: Mutex::new(SharedDevices::default()),
+ shared_voice_processing_unit: SharedVoiceProcessingUnitManager::new(shared_vp_queue),
}
}
@@ -2108,14 +2505,14 @@ impl AudioUnitContext {
controller.streams
}
- fn update_latency_by_adding_stream(&self, latency_frames: u32) -> Option<u32> {
+ fn update_latency_by_adding_stream(&self, latency_frames: u32) -> u32 {
let mut controller = self.latency_controller.lock().unwrap();
controller.add_stream(latency_frames)
}
- fn update_latency_by_removing_stream(&self) -> Option<u32> {
+ fn update_latency_by_removing_stream(&self) {
let mut controller = self.latency_controller.lock().unwrap();
- controller.subtract_stream()
+ controller.subtract_stream();
}
fn add_devices_changed_listener(
@@ -2228,8 +2625,16 @@ impl AudioUnitContext {
impl ContextOps for AudioUnitContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
- set_notification_runloop();
- let ctx = Box::new(AudioUnitContext::new());
+ run_serially(set_notification_runloop);
+ let mut ctx = Box::new(AudioUnitContext::new());
+ let queue_label = format!("{}.context.{:p}", DISPATCH_QUEUE_LABEL, ctx.as_ref());
+ ctx.serial_queue =
+ Queue::new_with_target(queue_label.as_str(), get_serial_queue_singleton());
+ let shared_vp_queue = Queue::new_with_target(
+ format!("{}.shared_vpio", queue_label).as_str(),
+ &ctx.serial_queue,
+ );
+ ctx.shared_voice_processing_unit = SharedVoiceProcessingUnitManager::new(shared_vp_queue);
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}
@@ -2379,28 +2784,18 @@ impl ContextOps for AudioUnitContext {
return Err(Error::invalid_parameter());
}
- // Latency cannot change if another stream is operating in parallel. In this case
- // latency is set to the other stream value.
- let global_latency_frames = self
- .update_latency_by_adding_stream(latency_frames)
- .unwrap();
- if global_latency_frames != latency_frames {
- cubeb_log!(
- "Use global latency {} instead of the requested latency {}.",
- global_latency_frames,
- latency_frames
- );
- }
-
let in_stm_settings = if let Some(params) = input_stream_params {
- let in_device =
- match create_device_info(input_device as AudioDeviceID, DeviceType::INPUT) {
- None => {
- cubeb_log!("Fail to create device info for input");
- return Err(Error::error());
- }
- Some(d) => d,
- };
+ let in_device = match self
+ .serial_queue
+ .run_sync(|| create_device_info(input_device as AudioDeviceID, DeviceType::INPUT))
+ .unwrap()
+ {
+ None => {
+ cubeb_log!("Fail to create device info for input");
+ return Err(Error::error());
+ }
+ Some(d) => d,
+ };
let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
Some((stm_params, in_device))
} else {
@@ -2408,20 +2803,34 @@ impl ContextOps for AudioUnitContext {
};
let out_stm_settings = if let Some(params) = output_stream_params {
- let out_device =
- match create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT) {
- None => {
- cubeb_log!("Fail to create device info for output");
- return Err(Error::error());
- }
- Some(d) => d,
- };
+ let out_device = match self
+ .serial_queue
+ .run_sync(|| create_device_info(output_device as AudioDeviceID, DeviceType::OUTPUT))
+ .unwrap()
+ {
+ None => {
+ cubeb_log!("Fail to create device info for output");
+ return Err(Error::error());
+ }
+ Some(d) => d,
+ };
let stm_params = StreamParams::from(unsafe { *params.as_ptr() });
Some((stm_params, out_device))
} else {
None
};
+ // Latency cannot change if another stream is operating in parallel. In this case
+ // latency is set to the other stream value.
+ let global_latency_frames = self.update_latency_by_adding_stream(latency_frames);
+ if global_latency_frames != latency_frames {
+ cubeb_log!(
+ "Use global latency {} instead of the requested latency {}.",
+ global_latency_frames,
+ latency_frames
+ );
+ }
+
let mut boxed_stream = Box::new(AudioUnitStream::new(
self,
user_ptr,
@@ -2431,16 +2840,25 @@ impl ContextOps for AudioUnitContext {
));
// Rename the task queue to be an unique label.
- let queue_label = format!("{}.{:p}", DISPATCH_QUEUE_LABEL, boxed_stream.as_ref());
- boxed_stream.queue = Queue::new(queue_label.as_str());
+ let queue_label = format!(
+ "{}.stream.{:p}",
+ DISPATCH_QUEUE_LABEL,
+ boxed_stream.as_ref()
+ );
+ boxed_stream.queue = Queue::new_with_target(queue_label.as_str(), &boxed_stream.queue);
boxed_stream.core_stream_data =
CoreStreamData::new(boxed_stream.as_ref(), in_stm_settings, out_stm_settings);
- let mut result = Ok(());
- boxed_stream.queue.clone().run_sync(|| {
- result = boxed_stream.core_stream_data.setup();
- });
+ let result = boxed_stream
+ .queue
+ .clone()
+ .run_sync(|| {
+ boxed_stream
+ .core_stream_data
+ .setup(&mut boxed_stream.context.shared_voice_processing_unit)
+ })
+ .unwrap();
if let Err(r) = result {
cubeb_log!(
"({:p}) Could not setup the audiounit stream.",
@@ -2465,25 +2883,43 @@ impl ContextOps for AudioUnitContext {
if devtype == DeviceType::UNKNOWN {
return Err(Error::invalid_parameter());
}
- if collection_changed_callback.is_some() {
- self.add_devices_changed_listener(devtype, collection_changed_callback, user_ptr)
- } else {
- self.remove_devices_changed_listener(devtype)
- }
+ self.serial_queue
+ .clone()
+ .run_sync(|| {
+ if collection_changed_callback.is_some() {
+ self.add_devices_changed_listener(
+ devtype,
+ collection_changed_callback,
+ user_ptr,
+ )
+ } else {
+ self.remove_devices_changed_listener(devtype)
+ }
+ })
+ .unwrap()
}
}
impl Drop for AudioUnitContext {
fn drop(&mut self) {
- let devices = self.devices.lock().unwrap();
- assert!(
+ assert!({
+ let devices = self.devices.lock().unwrap();
devices.input.changed_callback.is_none() && devices.output.changed_callback.is_none()
- );
+ });
+
+ self.shared_voice_processing_unit =
+ SharedVoiceProcessingUnitManager::new(self.serial_queue.clone());
+
+ // Make sure all the pending (device-collection-changed-callback) tasks
+ // in queue are done, and cancel all the tasks appended after `drop` is executed.
+ let queue = self.serial_queue.clone();
+ queue.run_final(|| {});
{
let controller = self.latency_controller.lock().unwrap();
- // Disabling this assert for bug 1083664 -- we seem to leak a stream
+ // Disabling this assert in release for bug 1083664 -- we seem to leak a stream
// assert(controller.streams == 0);
+ debug_assert!(controller.streams == 0);
if controller.streams > 0 {
cubeb_log!(
"({:p}) API misuse, {} streams active when context destroyed!",
@@ -2492,10 +2928,6 @@ impl Drop for AudioUnitContext {
);
}
}
- // Make sure all the pending (device-collection-changed-callback) tasks
- // in queue are done, and cancel all the tasks appended after `drop` is executed.
- let queue = self.serial_queue.clone();
- queue.run_final(|| {});
}
}
@@ -2562,6 +2994,8 @@ struct CoreStreamData<'ctx> {
// I/O AudioUnits.
input_unit: AudioUnit,
output_unit: AudioUnit,
+ // Handle to shared voiceprocessing AudioUnit, if in use.
+ voiceprocessing_unit_handle: Option<OwningHandle<VoiceProcessingUnit>>,
// Info of the I/O devices.
input_device: device_info,
output_device: device_info,
@@ -2603,6 +3037,7 @@ impl<'ctx> Default for CoreStreamData<'ctx> {
output_dev_desc: AudioStreamBasicDescription::default(),
input_unit: ptr::null_mut(),
output_unit: ptr::null_mut(),
+ voiceprocessing_unit_handle: None,
input_device: device_info::default(),
output_device: device_info::default(),
input_processing_params: InputProcessingParams::NONE,
@@ -2649,6 +3084,7 @@ impl<'ctx> CoreStreamData<'ctx> {
output_dev_desc: AudioStreamBasicDescription::default(),
input_unit: ptr::null_mut(),
output_unit: ptr::null_mut(),
+ voiceprocessing_unit_handle: None,
input_device: in_dev,
output_device: out_dev,
input_processing_params: InputProcessingParams::NONE,
@@ -2716,7 +3152,7 @@ impl<'ctx> CoreStreamData<'ctx> {
}
fn using_voice_processing_unit(&self) -> bool {
- !self.input_unit.is_null() && self.input_unit == self.output_unit
+ self.voiceprocessing_unit_handle.is_some()
}
fn same_clock_domain(&self) -> bool {
@@ -2744,6 +3180,22 @@ impl<'ctx> CoreStreamData<'ctx> {
input_domain == output_domain
}
+ #[allow(non_upper_case_globals)]
+ fn should_force_vpio_for_input_device(&self, in_device: &device_info) -> bool {
+ assert!(in_device.id != kAudioObjectUnknown);
+ self.debug_assert_is_on_stream_queue();
+ match get_device_transport_type(in_device.id, DeviceType::INPUT) {
+ Ok(kAudioDeviceTransportTypeBuiltIn) => {
+ cubeb_log!(
+ "Forcing VPIO because input device is built in, and its volume \
+ is known to be very low without VPIO whenever VPIO is hooked up to it elsewhere."
+ );
+ true
+ }
+ _ => false,
+ }
+ }
+
fn should_block_vpio_for_device_pair(
&self,
in_device: &device_info,
@@ -2751,7 +3203,10 @@ impl<'ctx> CoreStreamData<'ctx> {
) -> bool {
self.debug_assert_is_on_stream_queue();
cubeb_log!("Evaluating device pair against VPIO block list");
- let log_device = |id, devtype| -> std::result::Result<(), OSStatus> {
+ let log_device_and_get_model_uid = |id, devtype| -> String {
+ let device_model_uid = get_device_model_uid(id, devtype)
+ .map(|s| s.into_string())
+ .unwrap_or_default();
cubeb_log!("{} uid=\"{}\", model_uid=\"{}\", transport_type={:?}, source={:?}, source_name=\"{}\", name=\"{}\", manufacturer=\"{}\"",
if devtype == DeviceType::INPUT {
"Input"
@@ -2760,43 +3215,59 @@ impl<'ctx> CoreStreamData<'ctx> {
"Output"
},
get_device_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
- get_device_model_uid(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
+ device_model_uid,
convert_uint32_into_string(get_device_transport_type(id, devtype).unwrap_or(0)),
convert_uint32_into_string(get_device_source(id, devtype).unwrap_or(0)),
get_device_source_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
get_device_name(id, devtype).map(|s| s.into_string()).unwrap_or_default(),
get_device_manufacturer(id, devtype).map(|s| s.into_string()).unwrap_or_default());
- Ok(())
+ device_model_uid
};
- log_device(in_device.id, DeviceType::INPUT);
- log_device(out_device.id, DeviceType::OUTPUT);
- match (
- get_device_model_uid(in_device.id, DeviceType::INPUT).map(|s| s.to_string()),
- get_device_model_uid(out_device.id, DeviceType::OUTPUT).map(|s| s.to_string()),
- ) {
- (Ok(in_model_uid), Ok(out_model_uid))
- if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
- && out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID) =>
- {
- cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED");
- true
- }
- _ => {
- cubeb_log!("Device pair is not blocked");
- false
- }
+
+ #[allow(non_upper_case_globals)]
+ let in_id = match in_device.id {
+ kAudioObjectUnknown => None,
+ id => Some(id),
+ };
+ #[allow(non_upper_case_globals)]
+ let out_id = match out_device.id {
+ kAudioObjectUnknown => None,
+ id => Some(id),
+ };
+
+ let (in_model_uid, out_model_uid) = (
+ in_id
+ .map(|id| log_device_and_get_model_uid(id, DeviceType::INPUT))
+ .unwrap_or_default(),
+ out_id
+ .map(|id| log_device_and_get_model_uid(id, DeviceType::OUTPUT))
+ .unwrap_or_default(),
+ );
+
+ if in_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
+ && out_model_uid.contains(APPLE_STUDIO_DISPLAY_USB_ID)
+ {
+ cubeb_log!("Both input and output device is an Apple Studio Display. BLOCKED");
+ return true;
}
+
+ cubeb_log!("Device pair is not blocked");
+ false
}
- fn create_audiounits(&mut self) -> Result<(device_info, device_info)> {
+ fn create_audiounits(
+ &mut self,
+ shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
+ ) -> Result<(device_info, device_info)> {
self.debug_assert_is_on_stream_queue();
let should_use_voice_processing_unit = self.has_input()
- && self.has_output()
- && self
+ && (self
.input_stream_params
.prefs()
.contains(StreamPrefs::VOICE)
- && !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device);
+ || self.should_force_vpio_for_input_device(&self.input_device))
+ && !self.should_block_vpio_for_device_pair(&self.input_device, &self.output_device)
+ && macos_kernel_major_version() != Ok(MACOS_KERNEL_MAJOR_VERSION_MONTEREY);
let should_use_aggregate_device = {
// It's impossible to create an aggregate device from an aggregate device, and it's
@@ -2843,16 +3314,20 @@ impl<'ctx> CoreStreamData<'ctx> {
// - As last resort, create regular AudioUnits. This is also the normal non-duplex path.
if should_use_voice_processing_unit {
- if let Ok(au) =
- create_voiceprocessing_audiounit(&self.input_device, &self.output_device)
- {
- cubeb_log!("({:p}) Using VoiceProcessingIO AudioUnit", self.stm_ptr);
- self.input_unit = au;
- self.output_unit = au;
+ if let Ok(mut au_handle) = get_voiceprocessing_audiounit(
+ shared_voice_processing_unit,
+ &self.input_device,
+ &self.output_device,
+ ) {
+ self.input_unit = au_handle.as_mut().unit;
+ if self.has_output() {
+ self.output_unit = au_handle.as_mut().unit;
+ }
+ self.voiceprocessing_unit_handle = Some(au_handle);
return Ok((self.input_device.clone(), self.output_device.clone()));
}
cubeb_log!(
- "({:p}) Failed to create VoiceProcessingIO AudioUnit. Trying a regular one.",
+ "({:p}) Failed to get VoiceProcessingIO AudioUnit. Trying a regular one.",
self.stm_ptr
);
}
@@ -2954,7 +3429,10 @@ impl<'ctx> CoreStreamData<'ctx> {
}
#[allow(clippy::cognitive_complexity)] // TODO: Refactoring.
- fn setup(&mut self) -> Result<()> {
+ fn setup(
+ &mut self,
+ shared_voice_processing_unit: &mut SharedVoiceProcessingUnitManager,
+ ) -> Result<()> {
self.debug_assert_is_on_stream_queue();
if self
.input_stream_params
@@ -2970,7 +3448,7 @@ impl<'ctx> CoreStreamData<'ctx> {
}
let same_clock_domain = self.same_clock_domain();
- let (in_dev_info, out_dev_info) = self.create_audiounits()?;
+ let (in_dev_info, out_dev_info) = self.create_audiounits(shared_voice_processing_unit)?;
let using_voice_processing_unit = self.using_voice_processing_unit();
assert!(!self.stm_ptr.is_null());
@@ -3155,6 +3633,25 @@ impl<'ctx> CoreStreamData<'ctx> {
);
}
+ if self.has_input() && !self.has_output() && using_voice_processing_unit {
+ // We must configure the output side of VPIO to match the input side, even if we don't use it.
+ let r = audio_unit_set_property(
+ self.input_unit,
+ kAudioUnitProperty_StreamFormat,
+ kAudioUnitScope_Input,
+ AU_OUT_BUS,
+ &self.input_dev_desc,
+ mem::size_of::<AudioStreamBasicDescription>(),
+ );
+ if r != NO_ERR {
+ cubeb_log!(
+ "AudioUnitSetProperty/output/kAudioUnitProperty_StreamFormat rv={}",
+ r
+ );
+ return Err(Error::error());
+ }
+ }
+
if self.has_output() {
assert!(!self.output_unit.is_null());
@@ -3463,15 +3960,31 @@ impl<'ctx> CoreStreamData<'ctx> {
// NOTE: On MacOS 14 the ducking happens on creation of the VPIO AudioUnit.
// On MacOS 10.15 it happens on both creation and initialization, which
// is why we defer the unducking until now.
- let r = audio_device_duck(self.output_device.id, 1.0, ptr::null_mut(), 0.5);
- if r != NO_ERR {
- cubeb_log!(
- "({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
- self.stm_ptr,
- self.output_device.id,
- r
- );
- }
+ #[allow(non_upper_case_globals)]
+ let mut device = match self.output_device.id {
+ kAudioObjectUnknown => None,
+ id => Some(id),
+ };
+ device = device.or_else(|| get_default_device(DeviceType::OUTPUT));
+ match device {
+ None => {
+ cubeb_log!(
+ "({:p}) No output device to undo vpio ducking on",
+ self.stm_ptr
+ );
+ }
+ Some(id) => {
+ let r = audio_device_duck(id, 1.0, ptr::null_mut(), 0.5);
+ if r != NO_ERR {
+ cubeb_log!(
+ "({:p}) Failed to undo ducking of voiceprocessing on output device {}. Proceeding... Error: {}",
+ self.stm_ptr,
+ id,
+ r
+ );
+ }
+ }
+ };
// Always try to remember the applied input mute state. If it cannot be applied
// to the new device pair, we notify the client of an error and it will have to
@@ -3558,10 +4071,16 @@ impl<'ctx> CoreStreamData<'ctx> {
}
if !self.input_unit.is_null() {
- dispose_audio_unit(self.input_unit);
+ if !self.using_voice_processing_unit() {
+ // The VPIO unit is shared and must not be disposed.
+ dispose_audio_unit(self.input_unit);
+ }
self.input_unit = ptr::null_mut();
}
+ // Return the VPIO unit if present.
+ self.voiceprocessing_unit_handle = None;
+
self.resampler.destroy();
self.mixer = None;
self.aggregate_device = None;
@@ -3873,8 +4392,6 @@ struct OutputCallbackTimingData {
// #[repr(C)] is used to prevent any padding from being added in the beginning of the AudioUnitStream.
#[repr(C)]
#[derive(Debug)]
-// Allow exposing this private struct in public interfaces when running tests.
-#[cfg_attr(test, allow(private_in_public))]
struct AudioUnitStream<'ctx> {
context: &'ctx mut AudioUnitContext,
user_ptr: *mut c_void,
@@ -3927,10 +4444,11 @@ impl<'ctx> AudioUnitStream<'ctx> {
});
let (output_callback_timing_data_write, output_callback_timing_data_read) =
output_callback_timing_data.split();
+ let queue = context.serial_queue.clone();
AudioUnitStream {
context,
user_ptr,
- queue: Queue::new(DISPATCH_QUEUE_LABEL),
+ queue,
data_callback,
state_callback,
device_changed_callback: Mutex::new(None),
@@ -4046,10 +4564,12 @@ impl<'ctx> AudioUnitStream<'ctx> {
}
}
- self.core_stream_data.setup().map_err(|e| {
- cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr);
- e
- })?;
+ self.core_stream_data
+ .setup(&mut self.context.shared_voice_processing_unit)
+ .map_err(|e| {
+ cubeb_log!("({:p}) Setup failed.", self.core_stream_data.stm_ptr);
+ e
+ })?;
if let Ok(volume) = vol_rv {
set_volume(self.core_stream_data.output_unit, volume);
@@ -4171,7 +4691,7 @@ impl<'ctx> AudioUnitStream<'ctx> {
impl<'ctx> Drop for AudioUnitStream<'ctx> {
fn drop(&mut self) {
// Execute destroy in serial queue to avoid collision with reinit when un/plug devices
- self.queue.clone().run_final(move || {
+ self.queue.clone().run_final(|| {
self.destroy();
self.core_stream_data = CoreStreamData::default();
});
@@ -4184,12 +4704,10 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
self.draining.store(false, Ordering::SeqCst);
// Execute start in serial queue to avoid racing with destroy or reinit.
- let mut result = Err(Error::error());
- let started = &mut result;
- let stream = &self;
- self.queue.run_sync(move || {
- *started = stream.core_stream_data.start_audiounits();
- });
+ let result = self
+ .queue
+ .run_sync(|| self.core_stream_data.start_audiounits())
+ .unwrap();
result?;
@@ -4205,10 +4723,8 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
self.stopped.store(true, Ordering::SeqCst);
// Execute stop in serial queue to avoid racing with destroy or reinit.
- let stream = &self;
- self.queue.run_sync(move || {
- stream.core_stream_data.stop_audiounits();
- });
+ self.queue
+ .run_sync(|| self.core_stream_data.stop_audiounits());
self.notify_state_changed(State::Stopped);
@@ -4285,12 +4801,10 @@ impl<'ctx> StreamOps for AudioUnitStream<'ctx> {
}
fn set_volume(&mut self, volume: f32) -> Result<()> {
// Execute set_volume in serial queue to avoid racing with destroy or reinit.
- let mut result = Err(Error::error());
- let set = &mut result;
- let stream = &self;
- self.queue.run_sync(move || {
- *set = set_volume(stream.core_stream_data.output_unit, volume);
- });
+ let result = self
+ .queue
+ .run_sync(|| set_volume(self.core_stream_data.output_unit, volume))
+ .unwrap();
result?;