diff options
Diffstat (limited to 'third_party/rust/audioipc2-client/src/context.rs')
-rw-r--r-- | third_party/rust/audioipc2-client/src/context.rs | 385 |
1 files changed, 385 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2-client/src/context.rs b/third_party/rust/audioipc2-client/src/context.rs new file mode 100644 index 0000000000..c255815ac8 --- /dev/null +++ b/third_party/rust/audioipc2-client/src/context.rs @@ -0,0 +1,385 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::stream; +use crate::{assert_not_in_callback, run_in_callback}; +use crate::{ClientStream, AUDIOIPC_INIT_PARAMS}; +#[cfg(target_os = "linux")] +use audio_thread_priority::get_current_thread_info; +#[cfg(not(target_os = "linux"))] +use audio_thread_priority::promote_current_thread_to_real_time; +use audioipc::ipccore::EventLoopHandle; +use audioipc::{ipccore, rpccore, sys, PlatformHandle}; +use audioipc::{ + messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, + ServerMessage, +}; +use cubeb_backend::{ + capi_new, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, + Result, Stream, StreamParams, StreamParamsRef, +}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::{fmt, mem, ptr}; + +struct CubebClient; + +impl rpccore::Client for CubebClient { + type ServerMessage = ServerMessage; + type ClientMessage = ClientMessage; +} + +pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); + +// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the +// common fields. +#[repr(C)] +pub struct ClientContext { + _ops: *const Ops, + rpc: rpccore::Proxy<ServerMessage, ClientMessage>, + rpc_thread: ipccore::EventLoopThread, + callback_thread: ipccore::EventLoopThread, + backend_id: CString, + device_collection_rpc: bool, + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, +} + +impl ClientContext { + #[doc(hidden)] + pub fn rpc_handle(&self) -> &EventLoopHandle { + self.rpc_thread.handle() + } + + #[doc(hidden)] + pub fn rpc(&self) -> Result<rpccore::Proxy<ServerMessage, ClientMessage>> { + self.rpc.try_clone().map_err(|_| Error::default()) + } + + #[doc(hidden)] + pub fn callback_handle(&self) -> &EventLoopHandle { + self.callback_thread.handle() + } +} + +#[cfg(target_os = "linux")] +fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) { + match get_current_thread_info() { + Ok(info) => { + let bytes = info.serialize(); + let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); + } + Err(_) => { + warn!("Could not remotely promote thread to RT."); + } + } +} + +#[cfg(not(target_os = "linux"))] +fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) { + match promote_current_thread_to_real_time(256, 48000) { + Ok(_) => { + info!("Audio thread promoted to real-time."); + } + Err(_) => { + warn!("Could not promote thread to real-time."); + } + } +} + +fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) { + if let Some(func) = callback { + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + } +} + +fn unregister_thread(callback: Option<extern "C" fn()>) { + if let Some(func) = callback { + func(); + } +} + +fn promote_and_register_thread( + rpc: &rpccore::Proxy<ServerMessage, ClientMessage>, + callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, +) { + promote_thread(rpc); + register_thread(callback); +} + +#[derive(Default)] +struct DeviceCollectionCallback { + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: usize, +} + +struct DeviceCollectionServer { + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, +} + +impl rpccore::Server for DeviceCollectionServer { + type ServerMessage = DeviceCollectionReq; + type ClientMessage = DeviceCollectionResp; + + fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { + match req { + DeviceCollectionReq::DeviceChange(device_type) => { + trace!( + "ctx_thread: DeviceChange Callback: device_type={}", + device_type + ); + + let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); + + let (input_cb, input_user_ptr) = { + let dcb = self.input_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + let (output_cb, output_user_ptr) = { + let dcb = self.output_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + + run_in_callback(|| { + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) } + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + unsafe { + output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void) + } + } + }); + + DeviceCollectionResp::DeviceChange + } + } + } +} + +impl ContextOps for ClientContext { + fn init(_context_name: Option<&CStr>) -> Result<Context> { + assert_not_in_callback(); + + let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap()); + let thread_create_callback = params.thread_create_callback; + let thread_destroy_callback = params.thread_destroy_callback; + + let server_connection = + unsafe { sys::Pipe::from_raw_handle(PlatformHandle::new(params.server_connection)) }; + + let rpc_thread = ipccore::EventLoopThread::new( + "AudioIPC Client RPC".to_string(), + None, + move || register_thread(thread_create_callback), + move || unregister_thread(thread_destroy_callback), + ) + .map_err(|_| Error::default())?; + let rpc = rpc_thread + .handle() + .bind_client::<CubebClient>(server_connection) + .map_err(|_| Error::default())?; + let rpc2 = rpc.try_clone().map_err(|_| Error::default())?; + + // Don't let errors bubble from here. Later calls against this context + // will return errors the caller expects to handle. + let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); + + let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) + .unwrap_or_else(|_| "(remote error)".to_string()); + let backend_id = CString::new(backend_id).expect("backend_id query failed"); + + // TODO: remove params.pool_size from init params. + let callback_thread = ipccore::EventLoopThread::new( + "AudioIPC Client Callback".to_string(), + Some(params.stack_size), + move || promote_and_register_thread(&rpc2, thread_create_callback), + move || unregister_thread(thread_destroy_callback), + ) + .map_err(|_| Error::default())?; + + let ctx = Box::new(ClientContext { + _ops: &CLIENT_OPS as *const _, + rpc, + rpc_thread, + callback_thread, + backend_id, + device_collection_rpc: false, + input_device_callback: Arc::new(Mutex::new(Default::default())), + output_device_callback: Arc::new(Mutex::new(Default::default())), + }); + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) + } + + fn backend_id(&mut self) -> &CStr { + assert_not_in_callback(); + self.backend_id.as_c_str() + } + + fn max_channel_count(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc()?, ContextGetMaxChannelCount => ContextMaxChannelCount()) + } + + fn min_latency(&mut self, params: StreamParams) -> Result<u32> { + assert_not_in_callback(); + let params = messages::StreamParams::from(params.as_ref()); + send_recv!(self.rpc()?, ContextGetMinLatency(params) => ContextMinLatency()) + } + + fn preferred_sample_rate(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc()?, ContextGetPreferredSampleRate => ContextPreferredSampleRate()) + } + + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { + assert_not_in_callback(); + let v: Vec<ffi::cubeb_device_info> = send_recv!( + self.rpc()?, ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices())? + .into_iter() + .map(|i| i.into()) + .collect(); + let mut vs = v.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = vs.as_mut_ptr(); + coll.count = vs.len(); + // Giving away the memory owned by vs. Don't free it! + // Reclaimed in `device_collection_destroy`. + mem::forget(vs); + Ok(()) + } + + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { + assert_not_in_callback(); + unsafe { + let coll = &mut *collection.as_ptr(); + let mut devices = Vec::from_raw_parts( + coll.device as *mut ffi::cubeb_device_info, + coll.count, + coll.count, + ); + for dev in &mut devices { + if !dev.device_id.is_null() { + let _ = CString::from_raw(dev.device_id as *mut _); + } + if !dev.group_id.is_null() { + let _ = CString::from_raw(dev.group_id as *mut _); + } + if !dev.vendor_name.is_null() { + let _ = CString::from_raw(dev.vendor_name as *mut _); + } + if !dev.friendly_name.is_null() { + let _ = CString::from_raw(dev.friendly_name as *mut _); + } + } + coll.device = ptr::null_mut(); + coll.count = 0; + Ok(()) + } + } + + fn stream_init( + &mut self, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + // These params aren't sent to the server + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + assert_not_in_callback(); + + let stream_name = stream_name.map(|name| name.to_bytes_with_nul().to_vec()); + + let input_stream_params = input_stream_params.map(messages::StreamParams::from); + let output_stream_params = output_stream_params.map(messages::StreamParams::from); + + let init_params = messages::StreamInitParams { + stream_name, + input_device: input_device as usize, + input_stream_params, + output_device: output_device as usize, + output_stream_params, + latency_frames, + }; + stream::init(self, init_params, data_callback, state_callback, user_ptr) + } + + fn register_device_collection_changed( + &mut self, + devtype: DeviceType, + collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, + ) -> Result<()> { + assert_not_in_callback(); + + if !self.device_collection_rpc { + let mut fd = send_recv!(self.rpc()?, + ContextSetupDeviceCollectionCallback => + ContextSetupDeviceCollectionCallback())?; + + let stream = unsafe { sys::Pipe::from_raw_handle(fd.platform_handle.take_handle()) }; + + let server = DeviceCollectionServer { + input_device_callback: self.input_device_callback.clone(), + output_device_callback: self.output_device_callback.clone(), + }; + + self.rpc_handle() + .bind_server(server, stream) + .map_err(|_| Error::default())?; + self.device_collection_rpc = true; + } + + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + let mut cb = self.input_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + let mut cb = self.output_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + + let enable = collection_changed_callback.is_some(); + send_recv!(self.rpc()?, + ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) => + ContextRegisteredDeviceCollectionChanged) + } +} + +impl Drop for ClientContext { + fn drop(&mut self) { + debug!("ClientContext dropped..."); + let _ = self + .rpc() + .and_then(|rpc| send_recv!(rpc, ClientDisconnect => ClientDisconnected)); + } +} + +impl fmt::Debug for ClientContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientContext") + .field("_ops", &self._ops) + .field("rpc", &self.rpc) + .field("core", &self.rpc_thread) + .field("cpu_pool", &"...") + .finish() + } +} |