diff options
Diffstat (limited to 'third_party/rust/cubeb-coreaudio/src/backend/tests/device_change.rs')
-rw-r--r-- | third_party/rust/cubeb-coreaudio/src/backend/tests/device_change.rs | 885 |
1 files changed, 885 insertions, 0 deletions
diff --git a/third_party/rust/cubeb-coreaudio/src/backend/tests/device_change.rs b/third_party/rust/cubeb-coreaudio/src/backend/tests/device_change.rs new file mode 100644 index 0000000000..c27dada7ad --- /dev/null +++ b/third_party/rust/cubeb-coreaudio/src/backend/tests/device_change.rs @@ -0,0 +1,885 @@ +// NOTICE: +// Avoid running TestDeviceSwitcher with TestDevicePlugger or active full-duplex streams +// sequentially! +// +// The TestDeviceSwitcher cannot work with any test that will create an aggregate device that is +// soon being destroyed. The TestDeviceSwitcher will cache the available devices, upon it's +// created, as the candidates for the default device. Therefore, those created aggregate devices +// may be cached in TestDeviceSwitcher. However, those aggregate devices may be destroyed when +// TestDeviceSwitcher is using them or they are in the cached list of TestDeviceSwitcher. +// +// Running those tests by setting `test-threads=1` doesn't really help (e.g., +// `cargo test test_register_device_changed_callback -- --ignored --nocapture --test-threads=1`). +// The aggregate device won't be destroyed immediately when `kAudioPlugInDestroyAggregateDevice` +// is set. As a result, the following tests requiring changing the devices will be run separately +// in the run_tests.sh script and marked by `ignore` by default. + +use super::utils::{ + get_devices_info_in_scope, test_create_device_change_listener, test_device_in_scope, + test_get_default_device, test_get_devices_in_scope, + test_get_stream_with_default_data_callback_by_type, test_ops_stream_operation, + test_set_default_device, Scope, StreamType, TestDevicePlugger, TestDeviceSwitcher, +}; +use super::*; +use std::sync::{LockResult, MutexGuard, WaitTimeoutResult}; + +// Switch default devices used by the active streams, to test stream reinitialization +// ================================================================================================ +#[ignore] +#[test] +fn test_switch_device() { + test_switch_device_in_scope(Scope::Input); + test_switch_device_in_scope(Scope::Output); +} + +fn test_switch_device_in_scope(scope: Scope) { + println!( + "Switch default device for {:?} while the stream is working.", + scope + ); + + // Do nothing if there is no 2 available devices at least. + let devices = test_get_devices_in_scope(scope.clone()); + if devices.len() < 2 { + println!("Need 2 devices for {:?} at least. Skip.", scope); + return; + } + + let mut device_switcher = TestDeviceSwitcher::new(scope.clone()); + + let notifier = Arc::new(Notifier::new(0)); + let also_notifier = notifier.clone(); + let listener = test_create_device_change_listener(scope.clone(), move |_addresses| { + let mut cnt = notifier.lock().unwrap(); + *cnt += 1; + notifier.notify(cnt); + NO_ERR + }); + listener.start(); + + let changed_watcher = Watcher::new(&also_notifier); + test_get_started_stream_in_scope(scope.clone(), move |_stream| loop { + let mut guard = changed_watcher.lock().unwrap(); + let start_cnt = guard.clone(); + device_switcher.next(); + guard = changed_watcher + .wait_while(guard, |cnt| *cnt == start_cnt) + .unwrap(); + if *guard >= devices.len() { + break; + } + }); +} + +fn test_get_started_stream_in_scope<F>(scope: Scope, operation: F) +where + F: FnOnce(*mut ffi::cubeb_stream), +{ + use std::f32::consts::PI; + const SAMPLE_FREQUENCY: u32 = 48_000; + + // Make sure the parameters meet the requirements of AudioUnitContext::stream_init + // (in the comments). + let mut stream_params = ffi::cubeb_stream_params::default(); + stream_params.format = ffi::CUBEB_SAMPLE_S16NE; + stream_params.rate = SAMPLE_FREQUENCY; + stream_params.prefs = ffi::CUBEB_STREAM_PREF_NONE; + stream_params.channels = 1; + stream_params.layout = ffi::CUBEB_LAYOUT_MONO; + + let (input_params, output_params) = match scope { + Scope::Input => ( + &mut stream_params as *mut ffi::cubeb_stream_params, + ptr::null_mut(), + ), + Scope::Output => ( + ptr::null_mut(), + &mut stream_params as *mut ffi::cubeb_stream_params, + ), + }; + + extern "C" fn state_callback( + stream: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + state: ffi::cubeb_state, + ) { + assert!(!stream.is_null()); + assert!(!user_ptr.is_null()); + assert_ne!(state, ffi::CUBEB_STATE_ERROR); + } + + extern "C" fn input_data_callback( + stream: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + input_buffer: *const c_void, + output_buffer: *mut c_void, + nframes: i64, + ) -> i64 { + assert!(!stream.is_null()); + assert!(!user_ptr.is_null()); + assert!(!input_buffer.is_null()); + assert!(output_buffer.is_null()); + nframes + } + + let mut position: i64 = 0; // TODO: Use Atomic instead. + + fn f32_to_i16_sample(x: f32) -> i16 { + (x * f32::from(i16::max_value())) as i16 + } + + extern "C" fn output_data_callback( + stream: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + input_buffer: *const c_void, + output_buffer: *mut c_void, + nframes: i64, + ) -> i64 { + assert!(!stream.is_null()); + assert!(!user_ptr.is_null()); + assert!(input_buffer.is_null()); + assert!(!output_buffer.is_null()); + + let buffer = unsafe { + let ptr = output_buffer as *mut i16; + let len = nframes as usize; + slice::from_raw_parts_mut(ptr, len) + }; + + let position = unsafe { &mut *(user_ptr as *mut i64) }; + + // Generate tone on the fly. + for data in buffer.iter_mut() { + let t1 = (2.0 * PI * 350.0 * (*position) as f32 / SAMPLE_FREQUENCY as f32).sin(); + let t2 = (2.0 * PI * 440.0 * (*position) as f32 / SAMPLE_FREQUENCY as f32).sin(); + *data = f32_to_i16_sample(0.5 * (t1 + t2)); + *position += 1; + } + + nframes + } + + test_ops_stream_operation( + "stream", + ptr::null_mut(), // Use default input device. + input_params, + ptr::null_mut(), // Use default output device. + output_params, + 4096, // TODO: Get latency by get_min_latency instead ? + match scope { + Scope::Input => Some(input_data_callback), + Scope::Output => Some(output_data_callback), + }, + Some(state_callback), + &mut position as *mut i64 as *mut c_void, + |stream| { + assert_eq!(unsafe { OPS.stream_start.unwrap()(stream) }, ffi::CUBEB_OK); + operation(stream); + assert_eq!(unsafe { OPS.stream_stop.unwrap()(stream) }, ffi::CUBEB_OK); + }, + ); +} + +// Plug and unplug devices, to test device collection changed callback +// ================================================================================================ +#[ignore] +#[test] +fn test_plug_and_unplug_device() { + test_plug_and_unplug_device_in_scope(Scope::Input); + test_plug_and_unplug_device_in_scope(Scope::Output); +} + +fn test_plug_and_unplug_device_in_scope(scope: Scope) { + let default_device = test_get_default_device(scope.clone()); + if default_device.is_none() { + println!("No device for {:?} to test", scope); + return; + } + + println!("Run test for {:?}", scope); + println!("NOTICE: The test will hang if the default input or output is an aggregate device.\nWe will fix this later."); + + let default_device = default_device.unwrap(); + let is_input = test_device_in_scope(default_device, Scope::Input); + let is_output = test_device_in_scope(default_device, Scope::Output); + + let mut context = AudioUnitContext::new(); + + // Register the devices-changed callbacks. + #[derive(Clone, PartialEq)] + struct Counts { + input: u32, + output: u32, + } + impl Counts { + fn new() -> Self { + Self { + input: 0, + output: 0, + } + } + } + let counts = Arc::new(Notifier::new(Counts::new())); + let counts_notifier_ptr = counts.as_ref() as *const Notifier<Counts>; + + assert!(context + .register_device_collection_changed( + DeviceType::INPUT, + Some(input_changed_callback), + counts_notifier_ptr as *mut c_void, + ) + .is_ok()); + + assert!(context + .register_device_collection_changed( + DeviceType::OUTPUT, + Some(output_changed_callback), + counts_notifier_ptr as *mut c_void, + ) + .is_ok()); + + let counts_watcher = Watcher::new(&counts); + + let mut device_plugger = TestDevicePlugger::new(scope).unwrap(); + + { + // Simulate adding devices and monitor the devices-changed callbacks. + let mut counts_guard = counts.lock().unwrap(); + let counts_start = counts_guard.clone(); + + assert!(device_plugger.plug().is_ok()); + + counts_guard = counts_watcher + .wait_while(counts_guard, |counts| { + (is_input && counts.input == counts_start.input) + || (is_output && counts.output == counts_start.output) + }) + .unwrap(); + + // Check changed count. + assert_eq!(counts_guard.input, if is_input { 1 } else { 0 }); + assert_eq!(counts_guard.output, if is_output { 1 } else { 0 }); + } + + { + // Simulate removing devices and monitor the devices-changed callbacks. + let mut counts_guard = counts.lock().unwrap(); + let counts_start = counts_guard.clone(); + + assert!(device_plugger.unplug().is_ok()); + + counts_guard = counts_watcher + .wait_while(counts_guard, |counts| { + (is_input && counts.input == counts_start.input) + || (is_output && counts.output == counts_start.output) + }) + .unwrap(); + + // Check changed count. + assert_eq!(counts_guard.input, if is_input { 2 } else { 0 }); + assert_eq!(counts_guard.output, if is_output { 2 } else { 0 }); + } + + extern "C" fn input_changed_callback(context: *mut ffi::cubeb, data: *mut c_void) { + println!( + "Input device collection @ {:p} is changed. Data @ {:p}", + context, data + ); + let notifier = unsafe { &*(data as *const Notifier<Counts>) }; + { + let mut counts = notifier.lock().unwrap(); + counts.input += 1; + notifier.notify(counts); + } + } + + extern "C" fn output_changed_callback(context: *mut ffi::cubeb, data: *mut c_void) { + println!( + "output device collection @ {:p} is changed. Data @ {:p}", + context, data + ); + let notifier = unsafe { &*(data as *const Notifier<Counts>) }; + { + let mut counts = notifier.lock().unwrap(); + counts.output += 1; + notifier.notify(counts); + } + } + + context.register_device_collection_changed(DeviceType::OUTPUT, None, ptr::null_mut()); + context.register_device_collection_changed(DeviceType::INPUT, None, ptr::null_mut()); +} + +// Switch default devices used by the active streams, to test device changed callback +// ================================================================================================ +#[ignore] +#[test] +fn test_register_device_changed_callback_to_check_default_device_changed_input() { + test_register_device_changed_callback_to_check_default_device_changed(StreamType::INPUT); +} + +#[ignore] +#[test] +fn test_register_device_changed_callback_to_check_default_device_changed_output() { + test_register_device_changed_callback_to_check_default_device_changed(StreamType::OUTPUT); +} + +#[ignore] +#[test] +fn test_register_device_changed_callback_to_check_default_device_changed_duplex() { + test_register_device_changed_callback_to_check_default_device_changed(StreamType::DUPLEX); +} + +fn test_register_device_changed_callback_to_check_default_device_changed(stm_type: StreamType) { + println!("NOTICE: The test will hang if the default input or output is an aggregate device.\nWe will fix this later."); + + let inputs = if stm_type.contains(StreamType::INPUT) { + let devices = test_get_devices_in_scope(Scope::Input).len(); + if devices >= 2 { + Some(devices) + } else { + None + } + } else { + None + }; + + let outputs = if stm_type.contains(StreamType::OUTPUT) { + let devices = test_get_devices_in_scope(Scope::Output).len(); + if devices >= 2 { + Some(devices) + } else { + None + } + } else { + None + }; + + if inputs.is_none() && outputs.is_none() { + println!("No enough devices to run the test!"); + return; + } + + let changed_count = Arc::new(Notifier::new(0u32)); + let notifier_ptr = changed_count.as_ref() as *const Notifier<u32>; + + test_get_stream_with_device_changed_callback( + "stream: test callback for default device changed", + stm_type, + None, // Use default input device. + None, // Use default output device. + notifier_ptr as *mut c_void, + state_callback, + device_changed_callback, + |stream| { + // If the duplex stream uses different input and output device, + // an aggregate device will be created and it will work for this duplex stream. + // This aggregate device will be added into the device list, but it won't + // be assigned to the default device, since the device list for setting + // default device is cached upon {input, output}_device_switcher is initialized. + + let changed_watcher = Watcher::new(&changed_count); + + if let Some(devices) = inputs { + let mut device_switcher = TestDeviceSwitcher::new(Scope::Input); + for _ in 0..devices { + // While the stream is re-initializing for the default device switch, + // switching for the default device again will be ignored. + while stream.switching_device.load(atomic::Ordering::SeqCst) { + std::hint::spin_loop() + } + let guard = changed_watcher.lock().unwrap(); + let start_cnt = guard.clone(); + device_switcher.next(); + changed_watcher + .wait_while(guard, |cnt| *cnt == start_cnt) + .unwrap(); + } + } + + if let Some(devices) = outputs { + let mut device_switcher = TestDeviceSwitcher::new(Scope::Output); + for _ in 0..devices { + // While the stream is re-initializing for the default device switch, + // switching for the default device again will be ignored. + while stream.switching_device.load(atomic::Ordering::SeqCst) { + std::hint::spin_loop() + } + let guard = changed_watcher.lock().unwrap(); + let start_cnt = guard.clone(); + device_switcher.next(); + changed_watcher + .wait_while(guard, |cnt| *cnt == start_cnt) + .unwrap(); + } + } + }, + ); + + extern "C" fn state_callback( + stream: *mut ffi::cubeb_stream, + _user_ptr: *mut c_void, + state: ffi::cubeb_state, + ) { + assert!(!stream.is_null()); + assert_ne!(state, ffi::CUBEB_STATE_ERROR); + } + + extern "C" fn device_changed_callback(data: *mut c_void) { + println!("Device change callback. data @ {:p}", data); + let notifier = unsafe { &*(data as *const Notifier<u32>) }; + let mut count_guard = notifier.lock().unwrap(); + *count_guard += 1; + notifier.notify(count_guard); + } +} + +// Unplug the devices used by the active streams, to test +// 1) device changed callback, or state callback +// 2) stream reinitialization that may race with stream destroying +// ================================================================================================ + +// Input-only stream +// ----------------- + +// Unplug the non-default input device for an input stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_input_stream_after_unplugging_a_nondefault_input_device() { + // The stream can be destroyed before running device-changed event handler + test_unplug_a_device_on_an_active_stream(StreamType::INPUT, Scope::Input, false, 0); +} + +#[ignore] +#[test] +fn test_suspend_input_stream_by_unplugging_a_nondefault_input_device() { + // Expect to get an error state callback by device-changed event handler + test_unplug_a_device_on_an_active_stream(StreamType::INPUT, Scope::Input, false, 2000); +} + +// Unplug the default input device for an input stream +// ------------------------------------------------------------------------------------------------ +#[ignore] +#[test] +fn test_destroy_input_stream_after_unplugging_a_default_input_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes, at the same when + // the stream is being destroyed + test_unplug_a_device_on_an_active_stream(StreamType::INPUT, Scope::Input, true, 0); +} + +#[ignore] +#[test] +fn test_reinit_input_stream_by_unplugging_a_default_input_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes + test_unplug_a_device_on_an_active_stream(StreamType::INPUT, Scope::Input, true, 2000); +} + +// Output-only stream +// ------------------ + +// Unplug the non-default output device for an output stream +// ------------------------------------------------------------------------------------------------ +#[ignore] +#[test] +fn test_destroy_output_stream_after_unplugging_a_nondefault_output_device() { + test_unplug_a_device_on_an_active_stream(StreamType::OUTPUT, Scope::Output, false, 0); +} + +#[ignore] +#[test] +fn test_suspend_output_stream_by_unplugging_a_nondefault_output_device() { + test_unplug_a_device_on_an_active_stream(StreamType::OUTPUT, Scope::Output, false, 2000); +} + +// Unplug the default output device for an output stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_output_stream_after_unplugging_a_default_output_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes, at the same when + // the stream is being destroyed + test_unplug_a_device_on_an_active_stream(StreamType::OUTPUT, Scope::Output, true, 0); +} + +#[ignore] +#[test] +fn test_reinit_output_stream_by_unplugging_a_default_output_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes + test_unplug_a_device_on_an_active_stream(StreamType::OUTPUT, Scope::Output, true, 2000); +} + +// Duplex stream +// ------------- + +// Unplug the non-default input device for a duplex stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_duplex_stream_after_unplugging_a_nondefault_input_device() { + // The stream can be destroyed before running device-changed event handler + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Input, false, 0); +} + +#[ignore] +#[test] +fn test_suspend_duplex_stream_by_unplugging_a_nondefault_input_device() { + // Expect to get an error state callback by device-changed event handler + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Input, false, 2000); +} + +// Unplug the non-default output device for a duplex stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_duplex_stream_after_unplugging_a_nondefault_output_device() { + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Output, false, 0); +} + +#[ignore] +#[test] +fn test_suspend_duplex_stream_by_unplugging_a_nondefault_output_device() { + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Output, false, 2000); +} + +// Unplug the non-default in-out device for a duplex stream +// ------------------------------------------------------------------------------------------------ +// TODO: Implement an in-out TestDevicePlugger + +// Unplug the default input device for a duplex stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_duplex_stream_after_unplugging_a_default_input_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes, at the same when + // the stream is being destroyed + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Input, true, 0); +} + +#[ignore] +#[test] +fn test_reinit_duplex_stream_by_unplugging_a_default_input_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Input, true, 2000); +} + +// Unplug the default ouput device for a duplex stream +// ------------------------------------------------------------------------------------------------ + +#[ignore] +#[test] +fn test_destroy_duplex_stream_after_unplugging_a_default_output_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes, at the same when + // the stream is being destroyed + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Output, true, 0); +} + +#[ignore] +#[test] +fn test_reinit_duplex_stream_by_unplugging_a_default_output_device() { + // Expect to get an device-changed callback by device-changed event handler, + // which will reinitialize the stream behind the scenes + test_unplug_a_device_on_an_active_stream(StreamType::DUPLEX, Scope::Output, true, 2000); +} + +fn test_unplug_a_device_on_an_active_stream( + stream_type: StreamType, + device_scope: Scope, + set_device_to_default: bool, + wait_up_to_ms: u64, +) { + let has_input = test_get_default_device(Scope::Input).is_some(); + let has_output = test_get_default_device(Scope::Output).is_some(); + + if stream_type.contains(StreamType::INPUT) && !has_input { + println!("No input device for input or duplex stream."); + return; + } + + if stream_type.contains(StreamType::OUTPUT) && !has_output { + println!("No output device for output or duplex stream."); + return; + } + + let default_device_before_plugging = test_get_default_device(device_scope.clone()).unwrap(); + println!( + "Before plugging, default {:?} device is {}", + device_scope, default_device_before_plugging + ); + + let mut plugger = TestDevicePlugger::new(device_scope.clone()).unwrap(); + assert!(plugger.plug().is_ok()); + assert_ne!(plugger.get_device_id(), kAudioObjectUnknown); + println!( + "Create plugger device: {} for {:?}", + plugger.get_device_id(), + device_scope + ); + + let default_device_after_plugging = test_get_default_device(device_scope.clone()).unwrap(); + println!( + "After plugging, default {:?} device is {}", + device_scope, default_device_after_plugging + ); + + // The new device, plugger, is possible to be set to the default device. + // Before running the test, we need to set the default device to the correct one. + if set_device_to_default { + // plugger should be the default device for the test. + // If it's not, then set it to the default device. + if default_device_after_plugging != plugger.get_device_id() { + let prev_def_dev = + test_set_default_device(plugger.get_device_id(), device_scope.clone()).unwrap(); + assert_eq!(prev_def_dev, default_device_after_plugging); + } + } else { + // plugger should NOT be the default device for the test. + // If it is, reset the default device to another one. + if default_device_after_plugging == plugger.get_device_id() { + let prev_def_dev = + test_set_default_device(default_device_before_plugging, device_scope.clone()) + .unwrap(); + assert_eq!(prev_def_dev, default_device_after_plugging); + } + } + + // Ignore the return devices' info since we only need to print them. + let _ = get_devices_info_in_scope(device_scope.clone()); + println!( + "Current default {:?} device is {}", + device_scope, + test_get_default_device(device_scope.clone()).unwrap() + ); + + let (input_device, output_device) = match device_scope { + Scope::Input => ( + if set_device_to_default { + None // default input device. + } else { + Some(plugger.get_device_id()) + }, + None, + ), + Scope::Output => ( + None, + if set_device_to_default { + None // default output device. + } else { + Some(plugger.get_device_id()) + }, + ), + }; + + #[derive(Clone, PartialEq)] + struct Data { + changed_count: u32, + states: Vec<ffi::cubeb_state>, + } + + impl Data { + fn new() -> Self { + Self { + changed_count: 0, + states: vec![], + } + } + } + + let notifier = Arc::new(Notifier::new(Data::new())); + let notifier_ptr = notifier.as_ref() as *const Notifier<Data>; + + test_get_stream_with_device_changed_callback( + "stream: test stream reinit/destroy after unplugging a device", + stream_type, + input_device, + output_device, + notifier_ptr as *mut c_void, + state_callback, + device_changed_callback, + |stream| { + stream.start(); + + let changed_watcher = Watcher::new(¬ifier); + let mut data_guard = notifier.lock().unwrap(); + assert_eq!(data_guard.states.last().unwrap(), &ffi::CUBEB_STATE_STARTED); + + println!( + "Stream runs on the device {} for {:?}", + plugger.get_device_id(), + device_scope + ); + + let dev = plugger.get_device_id(); + let start_changed_count = data_guard.changed_count.clone(); + + assert!(plugger.unplug().is_ok()); + + if set_device_to_default { + // The stream will be reinitialized if it follows the default input or output device. + println!("Waiting for default device to change and reinit"); + data_guard = changed_watcher + .wait_while(data_guard, |data| { + data.changed_count == start_changed_count + || data.states.last().unwrap_or(&ffi::CUBEB_STATE_ERROR) + != &ffi::CUBEB_STATE_STARTED + }) + .unwrap(); + } else if wait_up_to_ms > 0 { + // stream can be dropped immediately before device-changed callback + // so we only check the states if we wait for it explicitly. + println!("Waiting for non-default device to enter error state"); + let (new_guard, timeout_res) = changed_watcher + .wait_timeout_while(data_guard, Duration::from_millis(wait_up_to_ms), |data| { + data.states.last().unwrap_or(&ffi::CUBEB_STATE_STARTED) + != &ffi::CUBEB_STATE_ERROR + }) + .unwrap(); + assert!(!timeout_res.timed_out()); + data_guard = new_guard; + } + + println!( + "Device {} for {:?} has been unplugged. The default {:?} device now is {}", + dev, + device_scope, + device_scope, + test_get_default_device(device_scope.clone()).unwrap() + ); + + println!("The stream is going to be destroyed soon"); + }, + ); + + extern "C" fn state_callback( + stream: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + state: ffi::cubeb_state, + ) { + println!("Device change callback. user_ptr @ {:p}", user_ptr); + assert!(!stream.is_null()); + println!( + "state: {}", + match state { + ffi::CUBEB_STATE_STARTED => "started", + ffi::CUBEB_STATE_STOPPED => "stopped", + ffi::CUBEB_STATE_DRAINED => "drained", + ffi::CUBEB_STATE_ERROR => "error", + _ => "unknown", + } + ); + let notifier = unsafe { &mut *(user_ptr as *mut Notifier<Data>) }; + let mut data_guard = notifier.lock().unwrap(); + data_guard.states.push(state); + notifier.notify(data_guard); + } + + extern "C" fn device_changed_callback(user_ptr: *mut c_void) { + println!("Device change callback. user_ptr @ {:p}", user_ptr); + let notifier = unsafe { &mut *(user_ptr as *mut Notifier<Data>) }; + let mut data_guard = notifier.lock().unwrap(); + data_guard.changed_count += 1; + notifier.notify(data_guard); + } +} + +struct Notifier<T> { + value: Mutex<T>, + cvar: Condvar, +} + +impl<T> Notifier<T> { + fn new(value: T) -> Self { + Self { + value: Mutex::new(value), + cvar: Condvar::new(), + } + } + + fn lock(&self) -> LockResult<MutexGuard<'_, T>> { + self.value.lock() + } + + fn notify(&self, _guard: MutexGuard<'_, T>) { + self.cvar.notify_all(); + } +} + +struct Watcher<T: Clone + PartialEq> { + notifier: Arc<Notifier<T>>, +} + +impl<T: Clone + PartialEq> Watcher<T> { + fn new(value: &Arc<Notifier<T>>) -> Self { + Self { + notifier: Arc::clone(value), + } + } + + fn lock(&self) -> LockResult<MutexGuard<'_, T>> { + self.notifier.lock() + } + + fn wait_while<'a, F>( + &self, + guard: MutexGuard<'a, T>, + condition: F, + ) -> LockResult<MutexGuard<'a, T>> + where + F: FnMut(&mut T) -> bool, + { + self.notifier.cvar.wait_while(guard, condition) + } + + fn wait_timeout_while<'a, F>( + &self, + guard: MutexGuard<'a, T>, + dur: Duration, + condition: F, + ) -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> + where + F: FnMut(&mut T) -> bool, + { + self.notifier.cvar.wait_timeout_while(guard, dur, condition) + } +} + +fn test_get_stream_with_device_changed_callback<F>( + name: &'static str, + stm_type: StreamType, + input_device: Option<AudioObjectID>, + output_device: Option<AudioObjectID>, + data: *mut c_void, + state_callback: extern "C" fn(*mut ffi::cubeb_stream, *mut c_void, ffi::cubeb_state), + device_changed_callback: extern "C" fn(*mut c_void), + operation: F, +) where + F: FnOnce(&mut AudioUnitStream), +{ + test_get_stream_with_default_data_callback_by_type( + name, + stm_type, + input_device, + output_device, + state_callback, + data, + |stream| { + assert!(stream + .register_device_changed_callback(Some(device_changed_callback)) + .is_ok()); + operation(stream); + assert!(stream.register_device_changed_callback(None).is_ok()); + }, + ); +} |