summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc2-client/src/context.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/audioipc2-client/src/context.rs')
-rw-r--r--third_party/rust/audioipc2-client/src/context.rs387
1 files changed, 387 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2-client/src/context.rs b/third_party/rust/audioipc2-client/src/context.rs
new file mode 100644
index 0000000000..650d4d00a8
--- /dev/null
+++ b/third_party/rust/audioipc2-client/src/context.rs
@@ -0,0 +1,387 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::stream;
+use crate::{assert_not_in_callback, run_in_callback};
+use crate::{ClientStream, AUDIOIPC_INIT_PARAMS};
+#[cfg(target_os = "linux")]
+use audio_thread_priority::get_current_thread_info;
+#[cfg(not(target_os = "linux"))]
+use audio_thread_priority::promote_current_thread_to_real_time;
+use audioipc::ipccore::EventLoopHandle;
+use audioipc::{ipccore, rpccore, sys, PlatformHandle};
+use audioipc::{
+ messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage,
+ ServerMessage,
+};
+use cubeb_backend::{
+ capi_new, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error,
+ InputProcessingParams, Ops, Result, Stream, StreamParams, StreamParamsRef,
+};
+use std::ffi::{CStr, CString};
+use std::os::raw::c_void;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::{fmt, mem, ptr};
+
+struct CubebClient;
+
+impl rpccore::Client for CubebClient {
+ type ServerMessage = ServerMessage;
+ type ClientMessage = ClientMessage;
+}
+
+pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
+
+// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the
+// common fields.
+#[repr(C)]
+pub struct ClientContext {
+ _ops: *const Ops,
+ rpc: rpccore::Proxy<ServerMessage, ClientMessage>,
+ rpc_thread: ipccore::EventLoopThread,
+ callback_thread: ipccore::EventLoopThread,
+ backend_id: CString,
+ device_collection_rpc: bool,
+ input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+ output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+}
+
+impl ClientContext {
+ #[doc(hidden)]
+ pub fn rpc_handle(&self) -> &EventLoopHandle {
+ self.rpc_thread.handle()
+ }
+
+ #[doc(hidden)]
+ pub fn rpc(&self) -> rpccore::Proxy<ServerMessage, ClientMessage> {
+ self.rpc.clone()
+ }
+
+ #[doc(hidden)]
+ pub fn callback_handle(&self) -> &EventLoopHandle {
+ self.callback_thread.handle()
+ }
+}
+
+#[cfg(target_os = "linux")]
+fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
+ match get_current_thread_info() {
+ Ok(info) => {
+ let bytes = info.serialize();
+ let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
+ }
+ Err(_) => {
+ warn!("Could not remotely promote thread to RT.");
+ }
+ }
+}
+
+#[cfg(not(target_os = "linux"))]
+fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) {
+ match promote_current_thread_to_real_time(0, 48000) {
+ Ok(_) => {
+ info!("Audio thread promoted to real-time.");
+ }
+ Err(_) => {
+ warn!("Could not promote thread to real-time.");
+ }
+ }
+}
+
+fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
+ if let Some(func) = callback {
+ let thr = thread::current();
+ let name = CString::new(thr.name().unwrap()).unwrap();
+ func(name.as_ptr());
+ }
+}
+
+fn unregister_thread(callback: Option<extern "C" fn()>) {
+ if let Some(func) = callback {
+ func();
+ }
+}
+
+fn promote_and_register_thread(
+ rpc: &rpccore::Proxy<ServerMessage, ClientMessage>,
+ callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+) {
+ promote_thread(rpc);
+ register_thread(callback);
+}
+
+#[derive(Default)]
+struct DeviceCollectionCallback {
+ cb: ffi::cubeb_device_collection_changed_callback,
+ user_ptr: usize,
+}
+
+struct DeviceCollectionServer {
+ input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+ output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+}
+
+impl rpccore::Server for DeviceCollectionServer {
+ type ServerMessage = DeviceCollectionReq;
+ type ClientMessage = DeviceCollectionResp;
+
+ fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
+ match req {
+ DeviceCollectionReq::DeviceChange(device_type) => {
+ trace!(
+ "ctx_thread: DeviceChange Callback: device_type={}",
+ device_type
+ );
+
+ let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type);
+
+ let (input_cb, input_user_ptr) = {
+ let dcb = self.input_device_callback.lock().unwrap();
+ (dcb.cb, dcb.user_ptr)
+ };
+ let (output_cb, output_user_ptr) = {
+ let dcb = self.output_device_callback.lock().unwrap();
+ (dcb.cb, dcb.user_ptr)
+ };
+
+ run_in_callback(|| {
+ if devtype.contains(cubeb_backend::DeviceType::INPUT) {
+ unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) }
+ }
+ if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
+ unsafe {
+ output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void)
+ }
+ }
+ });
+
+ DeviceCollectionResp::DeviceChange
+ }
+ }
+ }
+}
+
+impl ContextOps for ClientContext {
+ fn init(_context_name: Option<&CStr>) -> Result<Context> {
+ assert_not_in_callback();
+
+ let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap());
+ let thread_create_callback = params.thread_create_callback;
+ let thread_destroy_callback = params.thread_destroy_callback;
+
+ let server_connection =
+ unsafe { sys::Pipe::from_raw_handle(PlatformHandle::new(params.server_connection)) };
+
+ let rpc_thread = ipccore::EventLoopThread::new(
+ "AudioIPC Client RPC".to_string(),
+ None,
+ move || register_thread(thread_create_callback),
+ move || unregister_thread(thread_destroy_callback),
+ )
+ .map_err(|_| Error::default())?;
+ let rpc = rpc_thread
+ .handle()
+ .bind_client::<CubebClient>(server_connection)
+ .map_err(|_| Error::default())?;
+ let rpc2 = rpc.clone();
+
+ // Don't let errors bubble from here. Later calls against this context
+ // will return errors the caller expects to handle.
+ let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);
+
+ let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId())
+ .unwrap_or_else(|_| "(remote error)".to_string());
+ let backend_id = CString::new(backend_id).expect("backend_id query failed");
+
+ // TODO: remove params.pool_size from init params.
+ let callback_thread = ipccore::EventLoopThread::new(
+ "AudioIPC Client Callback".to_string(),
+ Some(params.stack_size),
+ move || promote_and_register_thread(&rpc2, thread_create_callback),
+ move || unregister_thread(thread_destroy_callback),
+ )
+ .map_err(|_| Error::default())?;
+
+ let ctx = Box::new(ClientContext {
+ _ops: &CLIENT_OPS as *const _,
+ rpc,
+ rpc_thread,
+ callback_thread,
+ backend_id,
+ device_collection_rpc: false,
+ input_device_callback: Arc::new(Mutex::new(Default::default())),
+ output_device_callback: Arc::new(Mutex::new(Default::default())),
+ });
+ Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
+ }
+
+ fn backend_id(&mut self) -> &CStr {
+ assert_not_in_callback();
+ self.backend_id.as_c_str()
+ }
+
+ fn max_channel_count(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+ }
+
+ fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
+ assert_not_in_callback();
+ let params = messages::StreamParams::from(params.as_ref());
+ send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
+ }
+
+ fn preferred_sample_rate(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+ }
+
+ fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> {
+ assert_not_in_callback();
+ send_recv!(self.rpc(),
+ ContextGetSupportedInputProcessingParams =>
+ ContextSupportedInputProcessingParams())
+ .map(InputProcessingParams::from_bits_truncate)
+ }
+
+ fn enumerate_devices(
+ &mut self,
+ devtype: DeviceType,
+ collection: &DeviceCollectionRef,
+ ) -> Result<()> {
+ assert_not_in_callback();
+ let v: Vec<ffi::cubeb_device_info> = send_recv!(
+ self.rpc(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices())?
+ .into_iter()
+ .map(|i| i.into())
+ .collect();
+ let mut vs = v.into_boxed_slice();
+ let coll = unsafe { &mut *collection.as_ptr() };
+ coll.device = vs.as_mut_ptr();
+ coll.count = vs.len();
+ // Giving away the memory owned by vs. Don't free it!
+ // Reclaimed in `device_collection_destroy`.
+ mem::forget(vs);
+ Ok(())
+ }
+
+ fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
+ assert_not_in_callback();
+ unsafe {
+ let coll = &mut *collection.as_ptr();
+ let mut devices = Vec::from_raw_parts(coll.device, coll.count, coll.count);
+ for dev in &mut devices {
+ if !dev.device_id.is_null() {
+ let _ = CString::from_raw(dev.device_id as *mut _);
+ }
+ if !dev.group_id.is_null() {
+ let _ = CString::from_raw(dev.group_id as *mut _);
+ }
+ if !dev.vendor_name.is_null() {
+ let _ = CString::from_raw(dev.vendor_name as *mut _);
+ }
+ if !dev.friendly_name.is_null() {
+ let _ = CString::from_raw(dev.friendly_name as *mut _);
+ }
+ }
+ coll.device = ptr::null_mut();
+ coll.count = 0;
+ Ok(())
+ }
+ }
+
+ fn stream_init(
+ &mut self,
+ stream_name: Option<&CStr>,
+ input_device: DeviceId,
+ input_stream_params: Option<&StreamParamsRef>,
+ output_device: DeviceId,
+ output_stream_params: Option<&StreamParamsRef>,
+ latency_frames: u32,
+ // These params aren't sent to the server
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<Stream> {
+ assert_not_in_callback();
+
+ let stream_name = stream_name.map(|name| name.to_bytes_with_nul().to_vec());
+
+ let input_stream_params = input_stream_params.map(messages::StreamParams::from);
+ let output_stream_params = output_stream_params.map(messages::StreamParams::from);
+
+ let init_params = messages::StreamInitParams {
+ stream_name,
+ input_device: input_device as usize,
+ input_stream_params,
+ output_device: output_device as usize,
+ output_stream_params,
+ latency_frames,
+ };
+ stream::init(self, init_params, data_callback, state_callback, user_ptr)
+ }
+
+ fn register_device_collection_changed(
+ &mut self,
+ devtype: DeviceType,
+ collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<()> {
+ assert_not_in_callback();
+
+ if !self.device_collection_rpc {
+ let mut fd = send_recv!(self.rpc(),
+ ContextSetupDeviceCollectionCallback =>
+ ContextSetupDeviceCollectionCallback())?;
+
+ let stream = unsafe { sys::Pipe::from_raw_handle(fd.platform_handle.take_handle()) };
+
+ let server = DeviceCollectionServer {
+ input_device_callback: self.input_device_callback.clone(),
+ output_device_callback: self.output_device_callback.clone(),
+ };
+
+ self.rpc_handle()
+ .bind_server(server, stream)
+ .map_err(|_| Error::default())?;
+ self.device_collection_rpc = true;
+ }
+
+ if devtype.contains(cubeb_backend::DeviceType::INPUT) {
+ let mut cb = self.input_device_callback.lock().unwrap();
+ cb.cb = collection_changed_callback;
+ cb.user_ptr = user_ptr as usize;
+ }
+ if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
+ let mut cb = self.output_device_callback.lock().unwrap();
+ cb.cb = collection_changed_callback;
+ cb.user_ptr = user_ptr as usize;
+ }
+
+ let enable = collection_changed_callback.is_some();
+ send_recv!(self.rpc(),
+ ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) =>
+ ContextRegisteredDeviceCollectionChanged)
+ }
+}
+
+impl Drop for ClientContext {
+ fn drop(&mut self) {
+ debug!("ClientContext dropped...");
+ let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
+ }
+}
+
+impl fmt::Debug for ClientContext {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ClientContext")
+ .field("_ops", &self._ops)
+ .field("rpc", &self.rpc)
+ .field("core", &self.rpc_thread)
+ .field("cpu_pool", &"...")
+ .finish()
+ }
+}