// Copyright © 2017 Mozilla Foundation // // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details use crate::stream; use crate::{assert_not_in_callback, run_in_callback}; use crate::{ClientStream, AUDIOIPC_INIT_PARAMS}; #[cfg(target_os = "linux")] use audio_thread_priority::get_current_thread_info; #[cfg(not(target_os = "linux"))] use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::ipccore::EventLoopHandle; use audioipc::{ipccore, rpccore, sys, PlatformHandle}; use audioipc::{ messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, ServerMessage, }; use cubeb_backend::{ capi_new, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef, }; use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::sync::{Arc, Mutex}; use std::thread; use std::{fmt, mem, ptr}; struct CubebClient; impl rpccore::Client for CubebClient { type ServerMessage = ServerMessage; type ClientMessage = ClientMessage; } pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); // ClientContext's layout *must* match cubeb.c's `struct cubeb` for the // common fields. #[repr(C)] pub struct ClientContext { _ops: *const Ops, rpc: rpccore::Proxy, rpc_thread: ipccore::EventLoopThread, callback_thread: ipccore::EventLoopThread, backend_id: CString, device_collection_rpc: bool, input_device_callback: Arc>, output_device_callback: Arc>, } impl ClientContext { #[doc(hidden)] pub fn rpc_handle(&self) -> &EventLoopHandle { self.rpc_thread.handle() } #[doc(hidden)] pub fn rpc(&self) -> Result> { self.rpc.try_clone().map_err(|_| Error::default()) } #[doc(hidden)] pub fn callback_handle(&self) -> &EventLoopHandle { self.callback_thread.handle() } } #[cfg(target_os = "linux")] fn promote_thread(rpc: &rpccore::Proxy) { match get_current_thread_info() { Ok(info) => { let bytes = info.serialize(); let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); } Err(_) => { warn!("Could not remotely promote thread to RT."); } } } #[cfg(not(target_os = "linux"))] fn promote_thread(_rpc: &rpccore::Proxy) { match promote_current_thread_to_real_time(256, 48000) { Ok(_) => { info!("Audio thread promoted to real-time."); } Err(_) => { warn!("Could not promote thread to real-time."); } } } fn register_thread(callback: Option) { if let Some(func) = callback { let thr = thread::current(); let name = CString::new(thr.name().unwrap()).unwrap(); func(name.as_ptr()); } } fn unregister_thread(callback: Option) { if let Some(func) = callback { func(); } } fn promote_and_register_thread( rpc: &rpccore::Proxy, callback: Option, ) { promote_thread(rpc); register_thread(callback); } #[derive(Default)] struct DeviceCollectionCallback { cb: ffi::cubeb_device_collection_changed_callback, user_ptr: usize, } struct DeviceCollectionServer { input_device_callback: Arc>, output_device_callback: Arc>, } impl rpccore::Server for DeviceCollectionServer { type ServerMessage = DeviceCollectionReq; type ClientMessage = DeviceCollectionResp; fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { match req { DeviceCollectionReq::DeviceChange(device_type) => { trace!( "ctx_thread: DeviceChange Callback: device_type={}", device_type ); let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); let (input_cb, input_user_ptr) = { let dcb = self.input_device_callback.lock().unwrap(); (dcb.cb, dcb.user_ptr) }; let (output_cb, output_user_ptr) = { let dcb = self.output_device_callback.lock().unwrap(); (dcb.cb, dcb.user_ptr) }; run_in_callback(|| { if devtype.contains(cubeb_backend::DeviceType::INPUT) { unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) } } if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { unsafe { output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void) } } }); DeviceCollectionResp::DeviceChange } } } } impl ContextOps for ClientContext { fn init(_context_name: Option<&CStr>) -> Result { assert_not_in_callback(); let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap()); let thread_create_callback = params.thread_create_callback; let thread_destroy_callback = params.thread_destroy_callback; let server_connection = unsafe { sys::Pipe::from_raw_handle(PlatformHandle::new(params.server_connection)) }; let rpc_thread = ipccore::EventLoopThread::new( "AudioIPC Client RPC".to_string(), None, move || register_thread(thread_create_callback), move || unregister_thread(thread_destroy_callback), ) .map_err(|_| Error::default())?; let rpc = rpc_thread .handle() .bind_client::(server_connection) .map_err(|_| Error::default())?; let rpc2 = rpc.try_clone().map_err(|_| Error::default())?; // Don't let errors bubble from here. Later calls against this context // will return errors the caller expects to handle. let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) .unwrap_or_else(|_| "(remote error)".to_string()); let backend_id = CString::new(backend_id).expect("backend_id query failed"); // TODO: remove params.pool_size from init params. let callback_thread = ipccore::EventLoopThread::new( "AudioIPC Client Callback".to_string(), Some(params.stack_size), move || promote_and_register_thread(&rpc2, thread_create_callback), move || unregister_thread(thread_destroy_callback), ) .map_err(|_| Error::default())?; let ctx = Box::new(ClientContext { _ops: &CLIENT_OPS as *const _, rpc, rpc_thread, callback_thread, backend_id, device_collection_rpc: false, input_device_callback: Arc::new(Mutex::new(Default::default())), output_device_callback: Arc::new(Mutex::new(Default::default())), }); Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) } fn backend_id(&mut self) -> &CStr { assert_not_in_callback(); self.backend_id.as_c_str() } fn max_channel_count(&mut self) -> Result { assert_not_in_callback(); send_recv!(self.rpc()?, ContextGetMaxChannelCount => ContextMaxChannelCount()) } fn min_latency(&mut self, params: StreamParams) -> Result { assert_not_in_callback(); let params = messages::StreamParams::from(params.as_ref()); send_recv!(self.rpc()?, ContextGetMinLatency(params) => ContextMinLatency()) } fn preferred_sample_rate(&mut self) -> Result { assert_not_in_callback(); send_recv!(self.rpc()?, ContextGetPreferredSampleRate => ContextPreferredSampleRate()) } fn enumerate_devices( &mut self, devtype: DeviceType, collection: &DeviceCollectionRef, ) -> Result<()> { assert_not_in_callback(); let v: Vec = send_recv!( self.rpc()?, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices())? .into_iter() .map(|i| i.into()) .collect(); let mut vs = v.into_boxed_slice(); let coll = unsafe { &mut *collection.as_ptr() }; coll.device = vs.as_mut_ptr(); coll.count = vs.len(); // Giving away the memory owned by vs. Don't free it! // Reclaimed in `device_collection_destroy`. mem::forget(vs); Ok(()) } fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { assert_not_in_callback(); unsafe { let coll = &mut *collection.as_ptr(); let mut devices = Vec::from_raw_parts( coll.device as *mut ffi::cubeb_device_info, coll.count, coll.count, ); for dev in &mut devices { if !dev.device_id.is_null() { let _ = CString::from_raw(dev.device_id as *mut _); } if !dev.group_id.is_null() { let _ = CString::from_raw(dev.group_id as *mut _); } if !dev.vendor_name.is_null() { let _ = CString::from_raw(dev.vendor_name as *mut _); } if !dev.friendly_name.is_null() { let _ = CString::from_raw(dev.friendly_name as *mut _); } } coll.device = ptr::null_mut(); coll.count = 0; Ok(()) } } fn stream_init( &mut self, stream_name: Option<&CStr>, input_device: DeviceId, input_stream_params: Option<&StreamParamsRef>, output_device: DeviceId, output_stream_params: Option<&StreamParamsRef>, latency_frames: u32, // These params aren't sent to the server data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result { assert_not_in_callback(); let stream_name = stream_name.map(|name| name.to_bytes_with_nul().to_vec()); let input_stream_params = input_stream_params.map(messages::StreamParams::from); let output_stream_params = output_stream_params.map(messages::StreamParams::from); let init_params = messages::StreamInitParams { stream_name, input_device: input_device as usize, input_stream_params, output_device: output_device as usize, output_stream_params, latency_frames, }; stream::init(self, init_params, data_callback, state_callback, user_ptr) } fn register_device_collection_changed( &mut self, devtype: DeviceType, collection_changed_callback: ffi::cubeb_device_collection_changed_callback, user_ptr: *mut c_void, ) -> Result<()> { assert_not_in_callback(); if !self.device_collection_rpc { let mut fd = send_recv!(self.rpc()?, ContextSetupDeviceCollectionCallback => ContextSetupDeviceCollectionCallback())?; let stream = unsafe { sys::Pipe::from_raw_handle(fd.platform_handle.take_handle()) }; let server = DeviceCollectionServer { input_device_callback: self.input_device_callback.clone(), output_device_callback: self.output_device_callback.clone(), }; self.rpc_handle() .bind_server(server, stream) .map_err(|_| Error::default())?; self.device_collection_rpc = true; } if devtype.contains(cubeb_backend::DeviceType::INPUT) { let mut cb = self.input_device_callback.lock().unwrap(); cb.cb = collection_changed_callback; cb.user_ptr = user_ptr as usize; } if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { let mut cb = self.output_device_callback.lock().unwrap(); cb.cb = collection_changed_callback; cb.user_ptr = user_ptr as usize; } let enable = collection_changed_callback.is_some(); send_recv!(self.rpc()?, ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) => ContextRegisteredDeviceCollectionChanged) } } impl Drop for ClientContext { fn drop(&mut self) { debug!("ClientContext dropped..."); let _ = self .rpc() .and_then(|rpc| send_recv!(rpc, ClientDisconnect => ClientDisconnected)); } } impl fmt::Debug for ClientContext { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ClientContext") .field("_ops", &self._ops) .field("rpc", &self.rpc) .field("core", &self.rpc_thread) .field("cpu_pool", &"...") .finish() } }