diff options
Diffstat (limited to 'third_party/rust/audioipc2-client')
-rw-r--r-- | third_party/rust/audioipc2-client/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/Cargo.toml | 33 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/cbindgen.toml | 28 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/src/context.rs | 387 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/src/lib.rs | 81 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/src/send_recv.rs | 73 | ||||
-rw-r--r-- | third_party/rust/audioipc2-client/src/stream.rs | 351 |
7 files changed, 954 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2-client/.cargo-checksum.json b/third_party/rust/audioipc2-client/.cargo-checksum.json new file mode 100644 index 0000000000..00f6ff0c0e --- /dev/null +++ b/third_party/rust/audioipc2-client/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"f4852af58bcfd24732469ffbdddfc9404bc0d8c1d2cef279f6a6b6ddd3a080a8","cbindgen.toml":"fb6abe1671497f432a06e40b1db7ed7cd2cceecbd9a2382193ad7534e8855e34","src/context.rs":"a0559e92b554ef3156ab2bf2f1424555c8ef4a7977b9f43ac8500a9f399f8d99","src/lib.rs":"c87d9d57a16a9286cde730978db692df0fbc70cc69dd4f4677198d6843031fd8","src/send_recv.rs":"859abe75b521eb4297c84b30423814b5b87f3c7741ad16fe72189212e123e1ac","src/stream.rs":"90dc6a85552f3569ab1847de4247a46bcff2f5aef0c4d43fa2376589df015b25"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/audioipc2-client/Cargo.toml b/third_party/rust/audioipc2-client/Cargo.toml new file mode 100644 index 0000000000..2002062c27 --- /dev/null +++ b/third_party/rust/audioipc2-client/Cargo.toml @@ -0,0 +1,33 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +name = "audioipc2-client" +version = "0.6.0" +authors = [ + "Matthew Gregan <kinetik@flim.org>", + "Dan Glastonbury <dan.glastonbury@gmail.com>", +] +description = "Cubeb Backend for talking to remote cubeb server." +license = "ISC" + +[dependencies] +cubeb-backend = "0.12" +log = "0.4" + +[dependencies.audio_thread_priority] +version = "0.31" +default-features = false + +[dependencies.audioipc] +path = "../audioipc" +package = "audioipc2" diff --git a/third_party/rust/audioipc2-client/cbindgen.toml b/third_party/rust/audioipc2-client/cbindgen.toml new file mode 100644 index 0000000000..b3267889c6 --- /dev/null +++ b/third_party/rust/audioipc2-client/cbindgen.toml @@ -0,0 +1,28 @@ +header = """/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */""" +autogen_warning = """/* DO NOT MODIFY THIS MANUALLY! This file was generated using cbindgen. */""" +include_version = true +braces = "SameLine" +line_length = 100 +tab_width = 2 +language = "C++" +namespaces = ["mozilla", "audioipc2"] + +[export] +item_types = ["globals", "enums", "structs", "unions", "typedefs", "opaque", "functions", "constants"] + +[parse] +parse_deps = true +include = ["audioipc2"] + +[fn] +args = "Vertical" +rename_args = "GeckoCase" + +[struct] +rename_fields = "GeckoCase" + +[defines] +"windows" = "XP_WIN" +"unix" = "XP_UNIX" diff --git a/third_party/rust/audioipc2-client/src/context.rs b/third_party/rust/audioipc2-client/src/context.rs new file mode 100644 index 0000000000..650d4d00a8 --- /dev/null +++ b/third_party/rust/audioipc2-client/src/context.rs @@ -0,0 +1,387 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::stream; +use crate::{assert_not_in_callback, run_in_callback}; +use crate::{ClientStream, AUDIOIPC_INIT_PARAMS}; +#[cfg(target_os = "linux")] +use audio_thread_priority::get_current_thread_info; +#[cfg(not(target_os = "linux"))] +use audio_thread_priority::promote_current_thread_to_real_time; +use audioipc::ipccore::EventLoopHandle; +use audioipc::{ipccore, rpccore, sys, PlatformHandle}; +use audioipc::{ + messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, + ServerMessage, +}; +use cubeb_backend::{ + capi_new, ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, + InputProcessingParams, Ops, Result, Stream, StreamParams, StreamParamsRef, +}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::{fmt, mem, ptr}; + +struct CubebClient; + +impl rpccore::Client for CubebClient { + type ServerMessage = ServerMessage; + type ClientMessage = ClientMessage; +} + +pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); + +// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the +// common fields. +#[repr(C)] +pub struct ClientContext { + _ops: *const Ops, + rpc: rpccore::Proxy<ServerMessage, ClientMessage>, + rpc_thread: ipccore::EventLoopThread, + callback_thread: ipccore::EventLoopThread, + backend_id: CString, + device_collection_rpc: bool, + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, +} + +impl ClientContext { + #[doc(hidden)] + pub fn rpc_handle(&self) -> &EventLoopHandle { + self.rpc_thread.handle() + } + + #[doc(hidden)] + pub fn rpc(&self) -> rpccore::Proxy<ServerMessage, ClientMessage> { + self.rpc.clone() + } + + #[doc(hidden)] + pub fn callback_handle(&self) -> &EventLoopHandle { + self.callback_thread.handle() + } +} + +#[cfg(target_os = "linux")] +fn promote_thread(rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) { + match get_current_thread_info() { + Ok(info) => { + let bytes = info.serialize(); + let _ = rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); + } + Err(_) => { + warn!("Could not remotely promote thread to RT."); + } + } +} + +#[cfg(not(target_os = "linux"))] +fn promote_thread(_rpc: &rpccore::Proxy<ServerMessage, ClientMessage>) { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => { + info!("Audio thread promoted to real-time."); + } + Err(_) => { + warn!("Could not promote thread to real-time."); + } + } +} + +fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) { + if let Some(func) = callback { + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + } +} + +fn unregister_thread(callback: Option<extern "C" fn()>) { + if let Some(func) = callback { + func(); + } +} + +fn promote_and_register_thread( + rpc: &rpccore::Proxy<ServerMessage, ClientMessage>, + callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, +) { + promote_thread(rpc); + register_thread(callback); +} + +#[derive(Default)] +struct DeviceCollectionCallback { + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: usize, +} + +struct DeviceCollectionServer { + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, +} + +impl rpccore::Server for DeviceCollectionServer { + type ServerMessage = DeviceCollectionReq; + type ClientMessage = DeviceCollectionResp; + + fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { + match req { + DeviceCollectionReq::DeviceChange(device_type) => { + trace!( + "ctx_thread: DeviceChange Callback: device_type={}", + device_type + ); + + let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); + + let (input_cb, input_user_ptr) = { + let dcb = self.input_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + let (output_cb, output_user_ptr) = { + let dcb = self.output_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + + run_in_callback(|| { + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) } + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + unsafe { + output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void) + } + } + }); + + DeviceCollectionResp::DeviceChange + } + } + } +} + +impl ContextOps for ClientContext { + fn init(_context_name: Option<&CStr>) -> Result<Context> { + assert_not_in_callback(); + + let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap()); + let thread_create_callback = params.thread_create_callback; + let thread_destroy_callback = params.thread_destroy_callback; + + let server_connection = + unsafe { sys::Pipe::from_raw_handle(PlatformHandle::new(params.server_connection)) }; + + let rpc_thread = ipccore::EventLoopThread::new( + "AudioIPC Client RPC".to_string(), + None, + move || register_thread(thread_create_callback), + move || unregister_thread(thread_destroy_callback), + ) + .map_err(|_| Error::default())?; + let rpc = rpc_thread + .handle() + .bind_client::<CubebClient>(server_connection) + .map_err(|_| Error::default())?; + let rpc2 = rpc.clone(); + + // Don't let errors bubble from here. Later calls against this context + // will return errors the caller expects to handle. + let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); + + let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) + .unwrap_or_else(|_| "(remote error)".to_string()); + let backend_id = CString::new(backend_id).expect("backend_id query failed"); + + // TODO: remove params.pool_size from init params. + let callback_thread = ipccore::EventLoopThread::new( + "AudioIPC Client Callback".to_string(), + Some(params.stack_size), + move || promote_and_register_thread(&rpc2, thread_create_callback), + move || unregister_thread(thread_destroy_callback), + ) + .map_err(|_| Error::default())?; + + let ctx = Box::new(ClientContext { + _ops: &CLIENT_OPS as *const _, + rpc, + rpc_thread, + callback_thread, + backend_id, + device_collection_rpc: false, + input_device_callback: Arc::new(Mutex::new(Default::default())), + output_device_callback: Arc::new(Mutex::new(Default::default())), + }); + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) + } + + fn backend_id(&mut self) -> &CStr { + assert_not_in_callback(); + self.backend_id.as_c_str() + } + + fn max_channel_count(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount()) + } + + fn min_latency(&mut self, params: StreamParams) -> Result<u32> { + assert_not_in_callback(); + let params = messages::StreamParams::from(params.as_ref()); + send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency()) + } + + fn preferred_sample_rate(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate()) + } + + fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> { + assert_not_in_callback(); + send_recv!(self.rpc(), + ContextGetSupportedInputProcessingParams => + ContextSupportedInputProcessingParams()) + .map(InputProcessingParams::from_bits_truncate) + } + + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { + assert_not_in_callback(); + let v: Vec<ffi::cubeb_device_info> = send_recv!( + self.rpc(), ContextGetDeviceEnumeration(devtype.bits()) => ContextEnumeratedDevices())? + .into_iter() + .map(|i| i.into()) + .collect(); + let mut vs = v.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = vs.as_mut_ptr(); + coll.count = vs.len(); + // Giving away the memory owned by vs. Don't free it! + // Reclaimed in `device_collection_destroy`. + mem::forget(vs); + Ok(()) + } + + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { + assert_not_in_callback(); + unsafe { + let coll = &mut *collection.as_ptr(); + let mut devices = Vec::from_raw_parts(coll.device, coll.count, coll.count); + for dev in &mut devices { + if !dev.device_id.is_null() { + let _ = CString::from_raw(dev.device_id as *mut _); + } + if !dev.group_id.is_null() { + let _ = CString::from_raw(dev.group_id as *mut _); + } + if !dev.vendor_name.is_null() { + let _ = CString::from_raw(dev.vendor_name as *mut _); + } + if !dev.friendly_name.is_null() { + let _ = CString::from_raw(dev.friendly_name as *mut _); + } + } + coll.device = ptr::null_mut(); + coll.count = 0; + Ok(()) + } + } + + fn stream_init( + &mut self, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + // These params aren't sent to the server + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + assert_not_in_callback(); + + let stream_name = stream_name.map(|name| name.to_bytes_with_nul().to_vec()); + + let input_stream_params = input_stream_params.map(messages::StreamParams::from); + let output_stream_params = output_stream_params.map(messages::StreamParams::from); + + let init_params = messages::StreamInitParams { + stream_name, + input_device: input_device as usize, + input_stream_params, + output_device: output_device as usize, + output_stream_params, + latency_frames, + }; + stream::init(self, init_params, data_callback, state_callback, user_ptr) + } + + fn register_device_collection_changed( + &mut self, + devtype: DeviceType, + collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, + ) -> Result<()> { + assert_not_in_callback(); + + if !self.device_collection_rpc { + let mut fd = send_recv!(self.rpc(), + ContextSetupDeviceCollectionCallback => + ContextSetupDeviceCollectionCallback())?; + + let stream = unsafe { sys::Pipe::from_raw_handle(fd.platform_handle.take_handle()) }; + + let server = DeviceCollectionServer { + input_device_callback: self.input_device_callback.clone(), + output_device_callback: self.output_device_callback.clone(), + }; + + self.rpc_handle() + .bind_server(server, stream) + .map_err(|_| Error::default())?; + self.device_collection_rpc = true; + } + + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + let mut cb = self.input_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + let mut cb = self.output_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + + let enable = collection_changed_callback.is_some(); + send_recv!(self.rpc(), + ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) => + ContextRegisteredDeviceCollectionChanged) + } +} + +impl Drop for ClientContext { + fn drop(&mut self) { + debug!("ClientContext dropped..."); + let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected); + } +} + +impl fmt::Debug for ClientContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientContext") + .field("_ops", &self._ops) + .field("rpc", &self.rpc) + .field("core", &self.rpc_thread) + .field("cpu_pool", &"...") + .finish() + } +} diff --git a/third_party/rust/audioipc2-client/src/lib.rs b/third_party/rust/audioipc2-client/src/lib.rs new file mode 100644 index 0000000000..e834fbac09 --- /dev/null +++ b/third_party/rust/audioipc2-client/src/lib.rs @@ -0,0 +1,81 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. +#![warn(unused_extern_crates)] + +#[macro_use] +extern crate log; + +#[macro_use] +mod send_recv; +mod context; +mod stream; + +use crate::context::ClientContext; +use crate::stream::ClientStream; +use audioipc::PlatformHandleType; +use cubeb_backend::{capi, ffi}; +use std::os::raw::{c_char, c_int}; + +thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = const { std::cell::RefCell::new(false) }); +thread_local!(static AUDIOIPC_INIT_PARAMS: std::cell::RefCell<Option<AudioIpcInitParams>> = const { std::cell::RefCell::new(None) }); + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct AudioIpcInitParams { + // Fields only need to be public for ipctest. + pub server_connection: PlatformHandleType, + pub pool_size: usize, + pub stack_size: usize, + pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, + pub thread_destroy_callback: Option<extern "C" fn()>, +} + +unsafe impl Send for AudioIpcInitParams {} + +fn set_in_callback(in_callback: bool) { + IN_CALLBACK.with(|b| { + assert_eq!(*b.borrow(), !in_callback); + *b.borrow_mut() = in_callback; + }); +} + +fn run_in_callback<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + set_in_callback(true); + + let r = f(); + + set_in_callback(false); + + r +} + +fn assert_not_in_callback() { + IN_CALLBACK.with(|b| { + assert!(!*b.borrow()); + }); +} + +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +/// Entry point from C code. +pub unsafe extern "C" fn audioipc2_client_init( + c: *mut *mut ffi::cubeb, + context_name: *const c_char, + init_params: *const AudioIpcInitParams, +) -> c_int { + if init_params.is_null() { + return cubeb_backend::ffi::CUBEB_ERROR; + } + + let init_params = &*init_params; + + AUDIOIPC_INIT_PARAMS.with(|p| { + *p.borrow_mut() = Some(*init_params); + }); + capi::capi_init::<ClientContext>(c, context_name) +} diff --git a/third_party/rust/audioipc2-client/src/send_recv.rs b/third_party/rust/audioipc2-client/src/send_recv.rs new file mode 100644 index 0000000000..1134c99a49 --- /dev/null +++ b/third_party/rust/audioipc2-client/src/send_recv.rs @@ -0,0 +1,73 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use cubeb_backend::Error; +use std::os::raw::c_int; + +#[doc(hidden)] +pub fn _err<E>(e: E) -> Error +where + E: Into<Option<c_int>>, +{ + match e.into() { + Some(e) => Error::from_raw(e), + None => Error::error(), + } +} + +#[macro_export] +macro_rules! send_recv { + ($rpc:expr, $smsg:ident => $rmsg:ident) => {{ + let resp = send_recv!(__send $rpc, $smsg); + send_recv!(__recv resp, $rmsg) + }}; + ($rpc:expr, $smsg:ident => $rmsg:ident()) => {{ + let resp = send_recv!(__send $rpc, $smsg); + send_recv!(__recv resp, $rmsg __result) + }}; + ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{ + let resp = send_recv!(__send $rpc, $smsg, $($a),*); + send_recv!(__recv resp, $rmsg) + }}; + ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{ + let resp = send_recv!(__send $rpc, $smsg, $($a),*); + send_recv!(__recv resp, $rmsg __result) + }}; + // + (__send $rpc:expr, $smsg:ident) => ({ + $rpc.call(ServerMessage::$smsg) + }); + (__send $rpc:expr, $smsg:ident, $($a:expr),*) => ({ + $rpc.call(ServerMessage::$smsg($($a),*)) + }); + (__recv $resp:expr, $rmsg:ident) => ({ + match $resp { + Ok(ClientMessage::$rmsg) => Ok(()), + Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), + Ok(m) => { + debug!("received wrong message - got={:?}", m); + Err($crate::send_recv::_err(None)) + }, + Err(e) => { + debug!("received error from rpc - got={:?}", e); + Err($crate::send_recv::_err(None)) + }, + } + }); + (__recv $resp:expr, $rmsg:ident __result) => ({ + match $resp { + Ok(ClientMessage::$rmsg(v)) => Ok(v), + Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), + Ok(m) => { + debug!("received wrong message - got={:?}", m); + Err($crate::send_recv::_err(None)) + }, + Err(e) => { + debug!("received error - got={:?}", e); + Err($crate::send_recv::_err(None)) + }, + } + }) +} diff --git a/third_party/rust/audioipc2-client/src/stream.rs b/third_party/rust/audioipc2-client/src/stream.rs new file mode 100644 index 0000000000..11e7c85412 --- /dev/null +++ b/third_party/rust/audioipc2-client/src/stream.rs @@ -0,0 +1,351 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::ClientContext; +use crate::{assert_not_in_callback, run_in_callback}; +use audioipc::messages::StreamCreateParams; +use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage}; +use audioipc::shm::SharedMem; +use audioipc::{rpccore, sys}; +use cubeb_backend::{ffi, DeviceRef, Error, InputProcessingParams, Result, Stream, StreamOps}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::ptr; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; + +pub struct Device(ffi::cubeb_device); + +impl Drop for Device { + fn drop(&mut self) { + unsafe { + if !self.0.input_name.is_null() { + let _ = CString::from_raw(self.0.input_name as *mut _); + } + if !self.0.output_name.is_null() { + let _ = CString::from_raw(self.0.output_name as *mut _); + } + } + } +} + +// ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the +// common fields. +#[repr(C)] +#[derive(Debug)] +pub struct ClientStream<'ctx> { + // This must be a reference to Context for cubeb, cubeb accesses + // stream methods via stream->context->ops + context: &'ctx ClientContext, + user_ptr: *mut c_void, + token: usize, + device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>, + // Signals ClientStream that CallbackServer has dropped. + shutdown_rx: mpsc::Receiver<()>, +} + +struct CallbackServer { + shm: SharedMem, + duplex_input: Option<Vec<u8>>, + data_cb: ffi::cubeb_data_callback, + state_cb: ffi::cubeb_state_callback, + user_ptr: usize, + device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>, + // Signals ClientStream that CallbackServer has dropped. + _shutdown_tx: mpsc::Sender<()>, +} + +impl rpccore::Server for CallbackServer { + type ServerMessage = CallbackReq; + type ClientMessage = CallbackResp; + + fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { + match req { + CallbackReq::Data { + nframes, + input_frame_size, + output_frame_size, + } => { + trace!( + "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}", + nframes, + input_frame_size, + output_frame_size, + ); + + let input_nbytes = nframes as usize * input_frame_size; + let output_nbytes = nframes as usize * output_frame_size; + + // Input and output reuse the same shmem backing. Unfortunately, cubeb's data_callback isn't + // specified in such a way that would require the callee to consume all of the input before + // writing to the output (i.e., it is passed as two pointers that aren't expected to alias). + // That means we need to copy the input here. + if let Some(buf) = &mut self.duplex_input { + assert!(input_nbytes > 0); + assert!(buf.capacity() >= input_nbytes); + unsafe { + let input = self.shm.get_slice(input_nbytes).unwrap(); + ptr::copy_nonoverlapping(input.as_ptr(), buf.as_mut_ptr(), input.len()); + } + } + + run_in_callback(|| { + let nframes = unsafe { + let input_ptr = if input_frame_size > 0 { + if let Some(buf) = &mut self.duplex_input { + buf.as_ptr() + } else { + self.shm.get_slice(input_nbytes).unwrap().as_ptr() + } + } else { + ptr::null() + }; + let output_ptr = if output_frame_size > 0 { + self.shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr() + } else { + ptr::null_mut() + }; + + self.data_cb.unwrap()( + ptr::null_mut(), // https://github.com/kinetiknz/cubeb/issues/518 + self.user_ptr as *mut c_void, + input_ptr as *const _, + output_ptr as *mut _, + nframes as _, + ) + }; + + CallbackResp::Data(nframes as isize) + }) + } + CallbackReq::State(state) => { + trace!("stream_thread: State Callback: {:?}", state); + run_in_callback(|| unsafe { + self.state_cb.unwrap()(ptr::null_mut(), self.user_ptr as *mut _, state); + }); + + CallbackResp::State + } + CallbackReq::DeviceChange => { + run_in_callback(|| { + let cb = *self.device_change_cb.lock().unwrap(); + if let Some(cb) = cb { + unsafe { + cb(self.user_ptr as *mut _); + } + } else { + warn!("DeviceChange received with null callback"); + } + }); + + CallbackResp::DeviceChange + } + } + } +} + +impl<'ctx> ClientStream<'ctx> { + fn init( + ctx: &'ctx ClientContext, + init_params: messages::StreamInitParams, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + assert_not_in_callback(); + + let rpc = ctx.rpc(); + let create_params = StreamCreateParams { + input_stream_params: init_params.input_stream_params, + output_stream_params: init_params.output_stream_params, + }; + let mut data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?; + + debug!( + "token = {}, handle = {:?} area_size = {:?}", + data.token, data.shm_handle, data.shm_area_size + ); + + let shm = + match unsafe { SharedMem::from(data.shm_handle.take_handle(), data.shm_area_size) } { + Ok(shm) => shm, + Err(e) => { + warn!( + "SharedMem client mapping failed (size={}, err={:?})", + data.shm_area_size, e + ); + return Err(Error::default()); + } + }; + + let duplex_input = if let (Some(_), Some(_)) = ( + init_params.input_stream_params, + init_params.output_stream_params, + ) { + let mut duplex_input = Vec::new(); + match duplex_input.try_reserve_exact(data.shm_area_size) { + Ok(()) => Some(duplex_input), + Err(e) => { + warn!( + "duplex_input allocation failed (size={}, err={:?})", + data.shm_area_size, e + ); + return Err(Error::default()); + } + } + } else { + None + }; + + let mut stream = + send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized())?; + let stream = unsafe { sys::Pipe::from_raw_handle(stream.take_handle()) }; + + let user_data = user_ptr as usize; + + let null_cb: ffi::cubeb_device_changed_callback = None; + let device_change_cb = Arc::new(Mutex::new(null_cb)); + + let (_shutdown_tx, shutdown_rx) = mpsc::channel(); + + let server = CallbackServer { + shm, + duplex_input, + data_cb: data_callback, + state_cb: state_callback, + user_ptr: user_data, + device_change_cb: device_change_cb.clone(), + _shutdown_tx, + }; + + ctx.callback_handle() + .bind_server(server, stream) + .map_err(|_| Error::default())?; + + let stream = Box::into_raw(Box::new(ClientStream { + context: ctx, + user_ptr, + token: data.token, + device_change_cb, + shutdown_rx, + })); + Ok(unsafe { Stream::from_ptr(stream as *mut _) }) + } +} + +impl Drop for ClientStream<'_> { + fn drop(&mut self) { + debug!("ClientStream drop"); + let _ = send_recv!(self.context.rpc(), StreamDestroy(self.token) => StreamDestroyed); + debug!("ClientStream drop - stream destroyed"); + // Wait for CallbackServer to shutdown. The remote server drops the RPC + // connection during StreamDestroy, which will cause CallbackServer to drop + // once the connection close is detected. Dropping CallbackServer will + // cause the shutdown channel to error on recv, which we rely on to + // synchronize with CallbackServer dropping. + let _ = self.shutdown_rx.recv(); + debug!("ClientStream dropped"); + } +} + +impl StreamOps for ClientStream<'_> { + fn start(&mut self) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamStart(self.token) => StreamStarted) + } + + fn stop(&mut self) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamStop(self.token) => StreamStopped) + } + + fn position(&mut self) -> Result<u64> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition()) + } + + fn latency(&mut self) -> Result<u32> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency()) + } + + fn input_latency(&mut self) -> Result<u32> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetInputLatency(self.token) => StreamInputLatency()) + } + + fn set_volume(&mut self, volume: f32) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet) + } + + fn set_name(&mut self, name: &CStr) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetName(self.token, name.to_owned()) => StreamNameSet) + } + + fn current_device(&mut self) -> Result<&DeviceRef> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) { + Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }), + Err(e) => Err(e), + } + } + + fn set_input_mute(&mut self, mute: bool) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetInputMute(self.token, mute) => StreamInputMuteSet) + } + + fn set_input_processing_params(&mut self, params: InputProcessingParams) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetInputProcessingParams(self.token, params.bits()) => StreamInputProcessingParamsSet) + } + + fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> { + assert_not_in_callback(); + if device.as_ptr().is_null() { + Err(Error::error()) + } else { + unsafe { + let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _); + } + Ok(()) + } + } + + fn register_device_changed_callback( + &mut self, + device_changed_callback: ffi::cubeb_device_changed_callback, + ) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + let enable = device_changed_callback.is_some(); + *self.device_change_cb.lock().unwrap() = device_changed_callback; + send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback) + } +} + +pub fn init( + ctx: &ClientContext, + init_params: messages::StreamInitParams, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, +) -> Result<Stream> { + let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?; + debug_assert_eq!(stm.user_ptr(), user_ptr); + Ok(stm) +} |