// Copyright © 2017 Mozilla Foundation // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details #[cfg(target_os = "linux")] use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo}; use audioipc::messages::SerializableHandle; use audioipc::messages::{ CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp, DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams, StreamInitParams, StreamParams, }; use audioipc::shm::SharedMem; use audioipc::{ipccore, rpccore, sys, PlatformHandle}; use cubeb_core as cubeb; use cubeb_core::ffi; use std::convert::{From, TryInto}; use std::ffi::CStr; use std::mem::size_of; use std::os::raw::{c_long, c_void}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{cell::RefCell, sync::Mutex}; use std::{panic, slice}; use crate::errors::*; fn error(error: cubeb::Error) -> ClientMessage { ClientMessage::Error(error.raw_code()) } struct CubebDeviceCollectionManager { servers: Mutex, cubeb::DeviceType)>>, } impl CubebDeviceCollectionManager { fn new() -> CubebDeviceCollectionManager { CubebDeviceCollectionManager { servers: Mutex::new(Vec::new()), } } fn register( &self, context: &cubeb::Context, server: &Rc, devtype: cubeb::DeviceType, ) -> cubeb::Result<()> { let mut servers = self.servers.lock().unwrap(); if servers.is_empty() { self.internal_register(context, true)?; } servers.push((server.clone(), devtype)); Ok(()) } fn unregister( &self, context: &cubeb::Context, server: &Rc, devtype: cubeb::DeviceType, ) -> cubeb::Result<()> { let mut servers = self.servers.lock().unwrap(); servers.retain(|(s, d)| !Rc::ptr_eq(s, server) || d != &devtype); if servers.is_empty() { self.internal_register(context, false)?; } Ok(()) } fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> { for &(dir, cb) in &[ ( cubeb::DeviceType::INPUT, device_collection_changed_input_cb_c as _, ), ( cubeb::DeviceType::OUTPUT, device_collection_changed_output_cb_c as _, ), ] { unsafe { context.register_device_collection_changed( dir, if enable { Some(cb) } else { None }, if enable { self as *const CubebDeviceCollectionManager as *mut c_void } else { std::ptr::null_mut() }, )?; } } Ok(()) } unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) { let servers = self.servers.lock().unwrap(); servers.iter().for_each(|(s, d)| { if d.contains(cubeb::DeviceType::from_bits_truncate(device_type)) { s.device_collection_changed_callback(device_type) } }); } } impl Drop for CubebDeviceCollectionManager { fn drop(&mut self) { assert!(self.servers.lock().unwrap().is_empty()); } } struct DevIdMap { devices: Vec, } // A cubeb_devid is an opaque type which may be implemented with a stable // pointer in a cubeb backend. cubeb_devids received remotely must be // validated before use, so DevIdMap provides a simple 1:1 mapping between a // cubeb_devid and an IPC-transportable value suitable for use as a unique // handle. impl DevIdMap { fn new() -> DevIdMap { let mut d = DevIdMap { devices: Vec::with_capacity(32), }; // A null cubeb_devid is used for selecting the default device. // Pre-populate the mapping with 0 -> 0 to handle nulls. d.devices.push(0); d } // Given a cubeb_devid, return a unique stable value suitable for use // over IPC. fn make_handle(&mut self, devid: usize) -> usize { if let Some(i) = self.devices.iter().position(|&d| d == devid) { return i; } self.devices.push(devid); self.devices.len() - 1 } // Given a handle produced by `make_handle`, return the associated // cubeb_devid. Invalid handles result in a panic. fn handle_to_id(&self, handle: usize) -> usize { self.devices[handle] } } struct CubebContextState { // `manager` must be dropped before the `context` is destroyed. manager: CubebDeviceCollectionManager, context: cubeb::Result, } thread_local!(static CONTEXT_KEY: RefCell> = RefCell::new(None)); fn cubeb_init_from_context_params() -> cubeb::Result { let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); let context_name = Some(params.context_name.as_c_str()); let backend_name = params.backend_name.as_deref(); let r = cubeb::Context::init(context_name, backend_name); r.map_err(|e| { info!("cubeb::Context::init failed r={:?}", e); e }) } fn with_local_context(f: F) -> T where F: FnOnce(&cubeb::Result, &mut CubebDeviceCollectionManager) -> T, { CONTEXT_KEY.with(|k| { let mut state = k.borrow_mut(); if state.is_none() { *state = Some(CubebContextState { manager: CubebDeviceCollectionManager::new(), context: cubeb_init_from_context_params(), }); } let CubebContextState { manager, context } = state.as_mut().unwrap(); // Always reattempt to initialize cubeb, OS config may have changed. if context.is_err() { *context = cubeb_init_from_context_params(); } f(context, manager) }) } struct DeviceCollectionClient; impl rpccore::Client for DeviceCollectionClient { type ServerMessage = DeviceCollectionReq; type ClientMessage = DeviceCollectionResp; } struct CallbackClient; impl rpccore::Client for CallbackClient { type ServerMessage = CallbackReq; type ClientMessage = CallbackResp; } struct ServerStreamCallbacks { /// Size of input frame in bytes input_frame_size: u16, /// Size of output frame in bytes output_frame_size: u16, /// Shared memory buffer for transporting audio data to/from client shm: SharedMem, /// RPC interface for data_callback (on OS audio thread) to server callback thread data_callback_rpc: rpccore::Proxy, /// RPC interface for state_callback (on any thread) to server callback thread state_callback_rpc: rpccore::Proxy, /// RPC interface for device_change_callback (on any thread) to server callback thread device_change_callback_rpc: rpccore::Proxy, } impl ServerStreamCallbacks { fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize { trace!( "Stream data callback: {} {} {}", nframes, input.len(), output.len() ); if self.input_frame_size != 0 { if input.len() > self.shm.get_size() { debug!( "bad input size: input={} shm={}", input.len(), self.shm.get_size() ); return cubeb::ffi::CUBEB_ERROR.try_into().unwrap(); } unsafe { self.shm .get_mut_slice(input.len()) .unwrap() .copy_from_slice(input); } } if self.output_frame_size != 0 && output.len() > self.shm.get_size() { debug!( "bad output size: output={} shm={}", output.len(), self.shm.get_size() ); return cubeb::ffi::CUBEB_ERROR.try_into().unwrap(); } let r = self.data_callback_rpc.call(CallbackReq::Data { nframes, input_frame_size: self.input_frame_size as usize, output_frame_size: self.output_frame_size as usize, }); match r { Ok(CallbackResp::Data(frames)) => { if frames >= 0 && self.output_frame_size != 0 { let nbytes = frames as usize * self.output_frame_size as usize; unsafe { output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap()); } } frames } _ => { debug!("Unexpected message {:?} during data_callback", r); cubeb::ffi::CUBEB_ERROR.try_into().unwrap() } } } fn state_callback(&self, state: cubeb::State) { trace!("Stream state callback: {:?}", state); let r = self .state_callback_rpc .call(CallbackReq::State(state.into())); match r { Ok(CallbackResp::State) => {} _ => { debug!("Unexpected message {:?} during state callback", r); } } } fn device_change_callback(&self) { trace!("Stream device change callback"); let r = self .device_change_callback_rpc .call(CallbackReq::DeviceChange); match r { Ok(CallbackResp::DeviceChange) => {} _ => { debug!("Unexpected message {:?} during device change callback", r); } } } } static SHM_ID: AtomicUsize = AtomicUsize::new(0); // Generate a temporary shm_id fragment that is unique to the process. This // path is used temporarily to create a shm segment, which is then // immediately deleted from the filesystem while retaining handles to the // shm to be shared between the server and client. fn get_shm_id() -> String { format!( "cubeb-shm-{}-{}", std::process::id(), SHM_ID.fetch_add(1, Ordering::SeqCst) ) } struct ServerStream { stream: Option, cbs: Box, client_pipe: Option, } impl Drop for ServerStream { fn drop(&mut self) { // `stream` *must* be dropped before `cbs`. drop(self.stream.take()); } } struct DeviceCollectionChangeCallback { rpc: rpccore::Proxy, } impl DeviceCollectionChangeCallback { fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) { // TODO: Assert device_type is in devtype. debug!( "Sending device collection ({:?}) changed event", device_type ); let _ = self .rpc .call(DeviceCollectionReq::DeviceChange(device_type)); } } pub struct CubebServer { callback_thread: ipccore::EventLoopHandle, device_collection_thread: ipccore::EventLoopHandle, streams: slab::Slab, remote_pid: Option, device_collection_change_callbacks: Option>, devidmap: DevIdMap, shm_area_size: usize, } impl Drop for CubebServer { fn drop(&mut self) { if let Some(device_collection_change_callbacks) = &self.device_collection_change_callbacks { debug!("CubebServer: dropped with device_collection_change_callbacks registered"); CONTEXT_KEY.with(|k| { let mut state = k.borrow_mut(); if let Some(CubebContextState { manager, context: Ok(context), }) = state.as_mut() { for devtype in [cubeb::DeviceType::INPUT, cubeb::DeviceType::OUTPUT] { let r = manager.unregister( context, device_collection_change_callbacks, devtype, ); if r.is_err() { debug!("CubebServer: unregister failed: {:?}", r); } } } }) } } } #[allow(unknown_lints)] // non_send_fields_in_send_ty is Nightly-only as of 2021-11-29. #[allow(clippy::non_send_fields_in_send_ty)] // XXX: required for server setup, verify this is safe. unsafe impl Send for CubebServer {} impl rpccore::Server for CubebServer { type ServerMessage = ServerMessage; type ClientMessage = ClientMessage; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { if let ServerMessage::ClientConnect(pid) = req { self.remote_pid = Some(pid); } with_local_context(|context, manager| match *context { Err(_) => error(cubeb::Error::error()), Ok(ref context) => self.process_msg(context, manager, &req), }) } } // Debugging for BMO 1594216/1612044. macro_rules! try_stream { ($self:expr, $stm_tok:expr) => { if $self.streams.contains($stm_tok) { $self.streams[$stm_tok] .stream .as_mut() .expect("uninitialized stream") } else { error!( "{}:{}:{} - Stream({}): invalid token", file!(), line!(), column!(), $stm_tok ); return error(cubeb::Error::invalid_parameter()); } }; } impl CubebServer { pub fn new( callback_thread: ipccore::EventLoopHandle, device_collection_thread: ipccore::EventLoopHandle, shm_area_size: usize, ) -> Self { CubebServer { callback_thread, device_collection_thread, streams: slab::Slab::::new(), remote_pid: None, device_collection_change_callbacks: None, devidmap: DevIdMap::new(), shm_area_size, } } // Process a request coming from the client. fn process_msg( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, msg: &ServerMessage, ) -> ClientMessage { let resp: ClientMessage = match *msg { ServerMessage::ClientConnect(_) => { // remote_pid is set before cubeb initialization, just verify here. assert!(self.remote_pid.is_some()); ClientMessage::ClientConnected } ServerMessage::ClientDisconnect => { // TODO: //self.connection.client_disconnect(); ClientMessage::ClientDisconnected } ServerMessage::ContextGetBackendId => { ClientMessage::ContextBackendId(context.backend_id().to_string()) } ServerMessage::ContextGetMaxChannelCount => context .max_channel_count() .map(ClientMessage::ContextMaxChannelCount) .unwrap_or_else(error), ServerMessage::ContextGetMinLatency(ref params) => { let format = cubeb::SampleFormat::from(params.format); let layout = cubeb::ChannelLayout::from(params.layout); let params = cubeb::StreamParamsBuilder::new() .format(format) .rate(params.rate) .channels(params.channels) .layout(layout) .take(); context .min_latency(¶ms) .map(ClientMessage::ContextMinLatency) .unwrap_or_else(error) } ServerMessage::ContextGetPreferredSampleRate => context .preferred_sample_rate() .map(ClientMessage::ContextPreferredSampleRate) .unwrap_or_else(error), ServerMessage::ContextGetDeviceEnumeration(device_type) => context .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) .map(|devices| { let v: Vec = devices .iter() .map(|i| { let mut tmp: DeviceInfo = i.as_ref().into(); // Replace each cubeb_devid with a unique handle suitable for IPC. tmp.devid = self.devidmap.make_handle(tmp.devid); tmp }) .collect(); ClientMessage::ContextEnumeratedDevices(v) }) .unwrap_or_else(error), ServerMessage::StreamCreate(ref params) => self .process_stream_create(params) .unwrap_or_else(|_| error(cubeb::Error::error())), ServerMessage::StreamInit(stm_tok, ref params) => self .process_stream_init(context, stm_tok, params) .unwrap_or_else(|_| error(cubeb::Error::error())), ServerMessage::StreamDestroy(stm_tok) => { if self.streams.contains(stm_tok) { debug!("Unregistering stream {:?}", stm_tok); self.streams.remove(stm_tok); } else { // Debugging for BMO 1594216/1612044. error!("StreamDestroy({}): invalid token", stm_tok); return error(cubeb::Error::invalid_parameter()); } ClientMessage::StreamDestroyed } ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok) .start() .map(|_| ClientMessage::StreamStarted) .unwrap_or_else(error), ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok) .stop() .map(|_| ClientMessage::StreamStopped) .unwrap_or_else(error), ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok) .position() .map(ClientMessage::StreamPosition) .unwrap_or_else(error), ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok) .latency() .map(ClientMessage::StreamLatency) .unwrap_or_else(error), ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok) .input_latency() .map(ClientMessage::StreamInputLatency) .unwrap_or_else(error), ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok) .set_volume(volume) .map(|_| ClientMessage::StreamVolumeSet) .unwrap_or_else(error), ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok) .set_name(name) .map(|_| ClientMessage::StreamNameSet) .unwrap_or_else(error), ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok) .current_device() .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) .unwrap_or_else(error), ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => { try_stream!(self, stm_tok) .register_device_changed_callback(if enable { Some(device_change_cb_c) } else { None }) .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback) .unwrap_or_else(error) } ServerMessage::ContextSetupDeviceCollectionCallback => { let (server_pipe, client_pipe) = match sys::make_pipe_pair() { Ok((server_pipe, client_pipe)) => (server_pipe, client_pipe), Err(e) => { debug!( "ContextSetupDeviceCollectionCallback - make_pipe_pair failed: {:?}", e ); return error(cubeb::Error::error()); } }; // TODO: this should bind the client_pipe and send the server_pipe to the remote, but // additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle. // TODO: Use the rpc_thread instead of an extra device_collection_thread, but a reentrant bind_client // is required to support that. let rpc = match self .device_collection_thread .bind_client::(server_pipe) { Ok(rpc) => rpc, Err(e) => { debug!( "ContextSetupDeviceCollectionCallback - bind_client: {:?}", e ); return error(cubeb::Error::error()); } }; self.device_collection_change_callbacks = Some(Rc::new(DeviceCollectionChangeCallback { rpc })); let fd = RegisterDeviceCollectionChanged { platform_handle: SerializableHandle::new(client_pipe, self.remote_pid.unwrap()), }; ClientMessage::ContextSetupDeviceCollectionCallback(fd) } ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self .process_register_device_collection_changed( context, manager, cubeb::DeviceType::from_bits_truncate(device_type), enable, ) .unwrap_or_else(error), #[cfg(target_os = "linux")] ServerMessage::PromoteThreadToRealTime(thread_info) => { let info = RtPriorityThreadInfo::deserialize(thread_info); match promote_thread_to_real_time(info, 256, 48000) { Ok(_) => { info!("Promotion of content process thread to real-time OK"); } Err(_) => { warn!("Promotion of content process thread to real-time error"); } } ClientMessage::ThreadPromoted } }; trace!("process_msg: req={:?}, resp={:?}", msg, resp); resp } fn process_register_device_collection_changed( &mut self, context: &cubeb::Context, manager: &mut CubebDeviceCollectionManager, devtype: cubeb::DeviceType, enable: bool, ) -> cubeb::Result { if devtype == cubeb::DeviceType::UNKNOWN { return Err(cubeb::Error::invalid_parameter()); } assert!(self.device_collection_change_callbacks.is_some()); let cbs = self.device_collection_change_callbacks.as_ref().unwrap(); if enable { manager.register(context, cbs, devtype) } else { manager.unregister(context, cbs, devtype) } .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged) } // Stream create is special, so it's been separated from process_msg. fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result { fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 { params .map(|p| { let format = p.format.into(); let sample_size = match format { cubeb::SampleFormat::S16LE | cubeb::SampleFormat::S16BE | cubeb::SampleFormat::S16NE => 2, cubeb::SampleFormat::Float32LE | cubeb::SampleFormat::Float32BE | cubeb::SampleFormat::Float32NE => 4, }; let channel_count = p.channels as u16; sample_size * channel_count }) .unwrap_or(0u16) } // Create the callback handling struct which is attached the cubeb stream. let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref()); let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref()); // Estimate a safe shmem size for this stream configuration. If the server was configured with a fixed // shm_area_size override, use that instead. // TODO: Add a new cubeb API to query the precise buffer size required for a given stream config. // https://github.com/mozilla/audioipc-2/issues/124 let shm_area_size = if self.shm_area_size == 0 { let frame_size = output_frame_size.max(input_frame_size) as u32; let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0); let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0); let rate = out_rate.max(in_rate); // 1s of audio, rounded up to the nearest 64kB. // Stream latency is capped at 1s in process_stream_init. (((rate * frame_size) + 0xffff) & !0xffff) as usize } else { self.shm_area_size }; debug!("shm_area_size = {}", shm_area_size); let shm = SharedMem::new(&get_shm_id(), shm_area_size)?; let shm_handle = unsafe { shm.make_handle()? }; let (server_pipe, client_pipe) = sys::make_pipe_pair()?; // TODO: this should bind the client_pipe and send the server_pipe to the remote, but // additional work is required as it's not possible to convert a Windows sys::Pipe into a raw handle. let rpc = self .callback_thread .bind_client::(server_pipe)?; let cbs = Box::new(ServerStreamCallbacks { input_frame_size, output_frame_size, shm, state_callback_rpc: rpc.try_clone()?, device_change_callback_rpc: rpc.try_clone()?, data_callback_rpc: rpc, }); let entry = self.streams.vacant_entry(); let key = entry.key(); debug!("Registering stream {:?}", key); entry.insert(ServerStream { stream: None, cbs, client_pipe: Some(client_pipe), }); Ok(ClientMessage::StreamCreated(StreamCreate { token: key, shm_handle: SerializableHandle::new(shm_handle, self.remote_pid.unwrap()), shm_area_size, })) } // Stream init is special, so it's been separated from process_msg. fn process_stream_init( &mut self, context: &cubeb::Context, stm_tok: usize, params: &StreamInitParams, ) -> Result { // Create cubeb stream from params let stream_name = params .stream_name .as_ref() .and_then(|name| CStr::from_bytes_with_nul(name).ok()); // Map IPC handle back to cubeb_devid. let input_device = self.devidmap.handle_to_id(params.input_device) as *const _; let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe { cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) }); // Map IPC handle back to cubeb_devid. let output_device = self.devidmap.handle_to_id(params.output_device) as *const _; let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe { cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) }); // TODO: Manage stream latency requests with respect to the RT deadlines the callback_thread was configured for. fn round_up_pow2(v: u32) -> u32 { debug_assert!(v >= 1); 1 << (32 - (v - 1).leading_zeros()) } let rate = params .output_stream_params .map(|p| p.rate) .unwrap_or_else(|| params.input_stream_params.map(|p| p.rate).unwrap()); // Note: minimum latency supported by AudioIPC is currently ~5ms. This restriction may be reduced by later IPC improvements. let min_latency = round_up_pow2(5 * rate / 1000); // Note: maximum latency is restricted by the SharedMem size. let max_latency = rate; let latency = params.latency_frames.max(min_latency).min(max_latency); trace!( "stream rate={} latency requested={} calculated={}", rate, params.latency_frames, latency ); let server_stream = &mut self.streams[stm_tok]; assert!(size_of::>() == size_of::()); let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void; let stream = unsafe { let stream = context.stream_init( stream_name, input_device, input_stream_params, output_device, output_stream_params, latency, Some(data_cb_c), Some(state_cb_c), user_ptr, ); match stream { Ok(stream) => stream, Err(e) => { debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e); self.streams.remove(stm_tok); return Err(e.into()); } } }; server_stream.stream = Some(stream); let client_pipe = server_stream .client_pipe .take() .expect("invalid state after StreamCreated"); Ok(ClientMessage::StreamInitialized(SerializableHandle::new( client_pipe, self.remote_pid.unwrap(), ))) } } // C callable callbacks unsafe extern "C" fn data_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, input_buffer: *const c_void, output_buffer: *mut c_void, nframes: c_long, ) -> c_long { let ok = panic::catch_unwind(|| { let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); let input = if input_buffer.is_null() { &[] } else { let nbytes = nframes * c_long::from(cbs.input_frame_size); slice::from_raw_parts(input_buffer as *const u8, nbytes as usize) }; let output: &mut [u8] = if output_buffer.is_null() { &mut [] } else { let nbytes = nframes * c_long::from(cbs.output_frame_size); slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize) }; cbs.data_callback(input, output, nframes as isize) as c_long }); ok.unwrap_or(cubeb::ffi::CUBEB_ERROR as c_long) } unsafe extern "C" fn state_cb_c( _: *mut ffi::cubeb_stream, user_ptr: *mut c_void, state: ffi::cubeb_state, ) { let ok = panic::catch_unwind(|| { let state = cubeb::State::from(state); let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); cbs.state_callback(state); }); ok.expect("State callback panicked"); } unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) { let ok = panic::catch_unwind(|| { let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); cbs.device_change_callback(); }); ok.expect("Device change callback panicked"); } unsafe extern "C" fn device_collection_changed_input_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, ) { let ok = panic::catch_unwind(|| { let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT); }); ok.expect("Collection changed (input) callback panicked"); } unsafe extern "C" fn device_collection_changed_output_cb_c( _: *mut ffi::cubeb, user_ptr: *mut c_void, ) { let ok = panic::catch_unwind(|| { let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT); }); ok.expect("Collection changed (output) callback panicked"); }