diff options
Diffstat (limited to 'third_party/rust/audioipc-server')
-rw-r--r-- | third_party/rust/audioipc-server/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/audioipc-server/Cargo.toml | 28 | ||||
-rw-r--r-- | third_party/rust/audioipc-server/cbindgen.toml | 28 | ||||
-rw-r--r-- | third_party/rust/audioipc-server/src/lib.rs | 208 | ||||
-rw-r--r-- | third_party/rust/audioipc-server/src/server.rs | 904 |
5 files changed, 1169 insertions, 0 deletions
diff --git a/third_party/rust/audioipc-server/.cargo-checksum.json b/third_party/rust/audioipc-server/.cargo-checksum.json new file mode 100644 index 0000000000..3f4b7a62fa --- /dev/null +++ b/third_party/rust/audioipc-server/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"Cargo.toml":"a90f1f22f12b7e82aa0940766dee91dbf27901c21680930d38d9b1940df247b7","cbindgen.toml":"bd89c5a9f52395b1c703ff04d1c0019dc3c92b691d571ae503c4b85753a44a39","src/lib.rs":"bb467df305bdb1403ffffd7960c5e55c14813d5098f0d88051d0c7542ed059dd","src/server.rs":"1fa365b9329d1873fb51893872a1e99ac86087428ca3bd6959611f3bd6d9549a"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/audioipc-server/Cargo.toml b/third_party/rust/audioipc-server/Cargo.toml new file mode 100644 index 0000000000..ed99b400c4 --- /dev/null +++ b/third_party/rust/audioipc-server/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "audioipc-server" +version = "0.2.3" +authors = [ + "Matthew Gregan <kinetik@flim.org>", + "Dan Glastonbury <dan.glastonbury@gmail.com>" + ] +license = "ISC" +description = "Remote cubeb server" +edition = "2018" + +[dependencies] +audioipc = { path = "../audioipc" } +cubeb-core = "0.10.0" +futures = "0.1.29" +once_cell = "1.2.0" +log = "0.4" +slab = "0.4" +tokio = { version="0.1", default-features=false, features = ["rt-full"] } + +[dependencies.error-chain] +version = "0.12.0" +default-features = false + +[dependencies.audio_thread_priority] +version = "0.26.1" +default-features = false +features = ["winapi"] diff --git a/third_party/rust/audioipc-server/cbindgen.toml b/third_party/rust/audioipc-server/cbindgen.toml new file mode 100644 index 0000000000..9a7c204fe4 --- /dev/null +++ b/third_party/rust/audioipc-server/cbindgen.toml @@ -0,0 +1,28 @@ +header = """/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */""" +autogen_warning = """/* DO NOT MODIFY THIS MANUALLY! This file was generated using cbindgen. */""" +include_version = true +braces = "SameLine" +line_length = 100 +tab_width = 2 +language = "C++" +namespaces = ["mozilla", "audioipc"] + +[export] +item_types = ["globals", "enums", "structs", "unions", "typedefs", "opaque", "functions", "constants"] + +[parse] +parse_deps = true +include = ['audioipc'] + +[fn] +args = "Vertical" +rename_args = "GeckoCase" + +[struct] +rename_fields = "GeckoCase" + +[defines] +"windows" = "XP_WIN" +"unix" = "XP_UNIX" diff --git a/third_party/rust/audioipc-server/src/lib.rs b/third_party/rust/audioipc-server/src/lib.rs new file mode 100644 index 0000000000..a6ef02f87a --- /dev/null +++ b/third_party/rust/audioipc-server/src/lib.rs @@ -0,0 +1,208 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details +#![warn(unused_extern_crates)] + +#[macro_use] +extern crate error_chain; +#[macro_use] +extern crate log; + +use audio_thread_priority::promote_current_thread_to_real_time; +use audioipc::core; +use audioipc::framing::framed; +use audioipc::rpc; +use audioipc::{MessageStream, PlatformHandle, PlatformHandleType}; +use futures::sync::oneshot; +use futures::Future; +use once_cell::sync::Lazy; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::ptr; +use std::sync::Mutex; +use tokio::reactor; + +mod server; + +struct CubebContextParams { + context_name: CString, + backend_name: Option<CString>, +} + +static G_CUBEB_CONTEXT_PARAMS: Lazy<Mutex<CubebContextParams>> = Lazy::new(|| { + Mutex::new(CubebContextParams { + context_name: CString::new("AudioIPC Server").unwrap(), + backend_name: None, + }) +}); + +#[allow(deprecated)] +pub mod errors { + #![allow(clippy::upper_case_acronyms)] + error_chain! { + links { + AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind); + } + foreign_links { + Cubeb(cubeb_core::Error); + Io(::std::io::Error); + Canceled(::futures::sync::oneshot::Canceled); + } + } +} + +use crate::errors::*; + +struct ServerWrapper { + core_thread: core::CoreThread, + callback_thread: core::CoreThread, +} + +fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) { + if let Some(func) = callback { + let thr = std::thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + } +} + +fn unregister_thread(callback: Option<extern "C" fn()>) { + if let Some(func) = callback { + func(); + } +} + +fn init_threads( + thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, + thread_destroy_callback: Option<extern "C" fn()>, +) -> Result<ServerWrapper> { + trace!("Starting up cubeb audio server event loop thread..."); + + let callback_thread = core::spawn_thread( + "AudioIPC Callback RPC", + move || { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => {} + Err(_) => { + debug!("Failed to promote audio callback thread to real-time."); + } + } + register_thread(thread_create_callback); + trace!("Starting up cubeb audio callback event loop thread..."); + Ok(()) + }, + move || { + unregister_thread(thread_destroy_callback); + }, + ) + .map_err(|e| { + debug!( + "Failed to start cubeb audio callback event loop thread: {:?}", + e + ); + e + })?; + + let core_thread = core::spawn_thread( + "AudioIPC Server RPC", + move || { + register_thread(thread_create_callback); + audioipc::server_platform_init(); + Ok(()) + }, + move || { + unregister_thread(thread_destroy_callback); + }, + ) + .map_err(|e| { + debug!("Failed to cubeb audio core event loop thread: {:?}", e); + e + })?; + + Ok(ServerWrapper { + core_thread, + callback_thread, + }) +} + +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct AudioIpcServerInitParams { + // Fields only need to be public for ipctest. + pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, + pub thread_destroy_callback: Option<extern "C" fn()>, +} + +#[allow(clippy::missing_safety_doc)] +#[no_mangle] +pub unsafe extern "C" fn audioipc_server_start( + context_name: *const std::os::raw::c_char, + backend_name: *const std::os::raw::c_char, + init_params: *const AudioIpcServerInitParams, +) -> *mut c_void { + assert!(!init_params.is_null()); + let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); + if !context_name.is_null() { + params.context_name = CStr::from_ptr(context_name).to_owned(); + } + if !backend_name.is_null() { + let backend_string = CStr::from_ptr(backend_name).to_owned(); + params.backend_name = Some(backend_string); + } + match init_threads( + (*init_params).thread_create_callback, + (*init_params).thread_destroy_callback, + ) { + Ok(server) => Box::into_raw(Box::new(server)) as *mut _, + Err(_) => ptr::null_mut() as *mut _, + } +} + +// A `shm_area_size` of 0 allows the server to calculate an appropriate shm size for each stream. +// A non-zero `shm_area_size` forces all allocations to the specified size. +#[no_mangle] +pub extern "C" fn audioipc_server_new_client( + p: *mut c_void, + shm_area_size: usize, +) -> PlatformHandleType { + let (wait_tx, wait_rx) = oneshot::channel(); + let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) }; + + let callback_thread_handle = wrapper.callback_thread.handle(); + + // We create a connected pair of anonymous IPC endpoints. One side + // is registered with the reactor core, the other side is returned + // to the caller. + MessageStream::anonymous_ipc_pair() + .map(|(ipc_server, ipc_client)| { + // Spawn closure to run on same thread as reactor::Core + // via remote handle. + wrapper + .core_thread + .handle() + .spawn(futures::future::lazy(move || { + trace!("Incoming connection"); + let handle = reactor::Handle::default(); + ipc_server.into_tokio_ipc(&handle) + .map(|sock| { + let transport = framed(sock, Default::default()); + rpc::bind_server(transport, server::CubebServer::new(callback_thread_handle, shm_area_size)); + }).map_err(|_| ()) + // Notify waiting thread that server has been registered. + .and_then(|_| wait_tx.send(())) + })) + .expect("Failed to spawn CubebServer"); + // Wait for notification that server has been registered + // with reactor::Core. + let _ = wait_rx.wait(); + unsafe { PlatformHandle::from(ipc_client).into_raw() } + }) + .unwrap_or(audioipc::INVALID_HANDLE_VALUE) +} + +#[no_mangle] +pub extern "C" fn audioipc_server_stop(p: *mut c_void) { + let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) }; + drop(wrapper); +} diff --git a/third_party/rust/audioipc-server/src/server.rs b/third_party/rust/audioipc-server/src/server.rs new file mode 100644 index 0000000000..c4b1f62327 --- /dev/null +++ b/third_party/rust/audioipc-server/src/server.rs @@ -0,0 +1,904 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +#[cfg(target_os = "linux")] +use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo}; +use audioipc::framing::{framed, Framed}; +use audioipc::messages::{ + CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp, + DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams, + StreamInitParams, StreamParams, +}; +use audioipc::rpc; +use audioipc::shm::SharedMem; +use audioipc::{codec::LengthDelimitedCodec, messages::SerializableHandle}; +use audioipc::{MessageStream, PlatformHandle}; +use cubeb_core as cubeb; +use cubeb_core::ffi; +use futures::future::{self, FutureResult}; +use futures::sync::oneshot; +use futures::Future; +use std::convert::From; +use std::ffi::CStr; +use std::mem::size_of; +use std::os::raw::{c_long, c_void}; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{cell::RefCell, sync::Mutex}; +use std::{panic, slice}; +use tokio::reactor; +use tokio::runtime::current_thread; + +use crate::errors::*; + +fn error(error: cubeb::Error) -> ClientMessage { + ClientMessage::Error(error.raw_code()) +} + +struct CubebDeviceCollectionManager { + servers: Mutex<Vec<Rc<RefCell<CubebServerCallbacks>>>>, +} + +impl CubebDeviceCollectionManager { + fn new() -> CubebDeviceCollectionManager { + CubebDeviceCollectionManager { + servers: Mutex::new(Vec::new()), + } + } + + fn register( + &mut self, + context: &cubeb::Context, + server: &Rc<RefCell<CubebServerCallbacks>>, + devtype: cubeb::DeviceType, + ) -> cubeb::Result<()> { + let mut servers = self.servers.lock().unwrap(); + if servers.is_empty() { + self.internal_register(context, true)?; + } + server.borrow_mut().devtype.insert(devtype); + if !servers.iter().any(|s| Rc::ptr_eq(s, server)) { + servers.push(server.clone()); + } + Ok(()) + } + + fn unregister( + &mut self, + context: &cubeb::Context, + server: &Rc<RefCell<CubebServerCallbacks>>, + devtype: cubeb::DeviceType, + ) -> cubeb::Result<()> { + let mut servers = self.servers.lock().unwrap(); + server.borrow_mut().devtype.remove(devtype); + if server.borrow().devtype.is_empty() { + servers.retain(|s| !Rc::ptr_eq(s, server)); + } + if servers.is_empty() { + self.internal_register(context, false)?; + } + Ok(()) + } + + fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> { + let user_ptr = if enable { + self as *const CubebDeviceCollectionManager as *mut c_void + } else { + std::ptr::null_mut() + }; + for &(dir, cb) in &[ + ( + cubeb::DeviceType::INPUT, + device_collection_changed_input_cb_c as _, + ), + ( + cubeb::DeviceType::OUTPUT, + device_collection_changed_output_cb_c as _, + ), + ] { + unsafe { + context.register_device_collection_changed( + dir, + if enable { Some(cb) } else { None }, + user_ptr, + )?; + } + } + Ok(()) + } + + // Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state. + unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) { + let servers = self.servers.lock().unwrap(); + servers.iter().for_each(|server| { + if server + .borrow() + .devtype + .contains(cubeb::DeviceType::from_bits_truncate(device_type)) + { + server + .borrow_mut() + .device_collection_changed_callback(device_type) + } + }); + } +} + +struct DevIdMap { + devices: Vec<usize>, +} + +// A cubeb_devid is an opaque type which may be implemented with a stable +// pointer in a cubeb backend. cubeb_devids received remotely must be +// validated before use, so DevIdMap provides a simple 1:1 mapping between a +// cubeb_devid and an IPC-transportable value suitable for use as a unique +// handle. +impl DevIdMap { + fn new() -> DevIdMap { + let mut d = DevIdMap { + devices: Vec::with_capacity(32), + }; + // A null cubeb_devid is used for selecting the default device. + // Pre-populate the mapping with 0 -> 0 to handle nulls. + d.devices.push(0); + d + } + + // Given a cubeb_devid, return a unique stable value suitable for use + // over IPC. + fn make_handle(&mut self, devid: usize) -> usize { + if let Some(i) = self.devices.iter().position(|&d| d == devid) { + return i; + } + self.devices.push(devid); + self.devices.len() - 1 + } + + // Given a handle produced by `make_handle`, return the associated + // cubeb_devid. Invalid handles result in a panic. + fn handle_to_id(&self, handle: usize) -> usize { + self.devices[handle] + } +} + +struct CubebContextState { + context: cubeb::Result<cubeb::Context>, + manager: CubebDeviceCollectionManager, +} + +thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = RefCell::new(None)); + +fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> { + let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); + let context_name = Some(params.context_name.as_c_str()); + let backend_name = params.backend_name.as_deref(); + let r = cubeb::Context::init(context_name, backend_name); + r.map_err(|e| { + info!("cubeb::Context::init failed r={:?}", e); + e + }) +} + +fn with_local_context<T, F>(f: F) -> T +where + F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T, +{ + CONTEXT_KEY.with(|k| { + let mut state = k.borrow_mut(); + if state.is_none() { + *state = Some(CubebContextState { + context: cubeb_init_from_context_params(), + manager: CubebDeviceCollectionManager::new(), + }); + } + let CubebContextState { context, manager } = state.as_mut().unwrap(); + // Always reattempt to initialize cubeb, OS config may have changed. + if context.is_err() { + *context = cubeb_init_from_context_params(); + } + f(context, manager) + }) +} + +struct DeviceCollectionClient; + +impl rpc::Client for DeviceCollectionClient { + type Request = DeviceCollectionReq; + type Response = DeviceCollectionResp; + type Transport = + Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>; +} + +struct CallbackClient; + +impl rpc::Client for CallbackClient { + type Request = CallbackReq; + type Response = CallbackResp; + type Transport = + Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>; +} + +struct ServerStreamCallbacks { + /// Size of input frame in bytes + input_frame_size: u16, + /// Size of output frame in bytes + output_frame_size: u16, + /// Shared memory buffer for transporting audio data to/from client + shm: SharedMem, + /// RPC interface to callback server running in client + rpc: rpc::ClientProxy<CallbackReq, CallbackResp>, +} + +impl ServerStreamCallbacks { + fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize { + trace!( + "Stream data callback: {} {} {}", + nframes, + input.len(), + output.len() + ); + + unsafe { + if self.input_frame_size != 0 { + self.shm + .get_mut_slice(input.len()) + .unwrap() + .copy_from_slice(input); + } + } + + let r = self + .rpc + .call(CallbackReq::Data { + nframes, + input_frame_size: self.input_frame_size as usize, + output_frame_size: self.output_frame_size as usize, + }) + .wait(); + + match r { + Ok(CallbackResp::Data(frames)) => { + if frames >= 0 { + let nbytes = frames as usize * self.output_frame_size as usize; + trace!("Reslice output to {}", nbytes); + unsafe { + if self.output_frame_size != 0 { + output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap()); + } + } + } + frames + } + _ => { + debug!("Unexpected message {:?} during data_callback", r); + // TODO: Return a CUBEB_ERROR result here once + // https://github.com/kinetiknz/cubeb/issues/553 is + // fixed. + 0 + } + } + } + + fn state_callback(&mut self, state: cubeb::State) { + trace!("Stream state callback: {:?}", state); + let r = self.rpc.call(CallbackReq::State(state.into())).wait(); + match r { + Ok(CallbackResp::State) => {} + _ => { + debug!("Unexpected message {:?} during state callback", r); + } + } + } + + fn device_change_callback(&mut self) { + trace!("Stream device change callback"); + let r = self.rpc.call(CallbackReq::DeviceChange).wait(); + match r { + Ok(CallbackResp::DeviceChange) => {} + _ => { + debug!("Unexpected message {:?} during device change callback", r); + } + } + } +} + +static SHM_ID: AtomicUsize = AtomicUsize::new(0); + +// Generate a temporary shm_id fragment that is unique to the process. This +// path is used temporarily to create a shm segment, which is then +// immediately deleted from the filesystem while retaining handles to the +// shm to be shared between the server and client. +fn get_shm_id() -> String { + format!( + "cubeb-shm-{}-{}", + std::process::id(), + SHM_ID.fetch_add(1, Ordering::SeqCst) + ) +} + +struct ServerStream { + stream: Option<cubeb::Stream>, + cbs: Box<ServerStreamCallbacks>, + shm_setup: Option<rpc::Response<CallbackResp>>, +} + +impl Drop for ServerStream { + fn drop(&mut self) { + // `stream` *must* be dropped before `cbs`. + drop(self.stream.take()); + } +} + +struct CubebServerCallbacks { + rpc: rpc::ClientProxy<DeviceCollectionReq, DeviceCollectionResp>, + devtype: cubeb::DeviceType, +} + +impl CubebServerCallbacks { + fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) { + // TODO: Assert device_type is in devtype. + debug!( + "Sending device collection ({:?}) changed event", + device_type + ); + let _ = self + .rpc + .call(DeviceCollectionReq::DeviceChange(device_type)) + .wait(); + } +} + +pub struct CubebServer { + callback_thread: current_thread::Handle, + streams: slab::Slab<ServerStream>, + remote_pid: Option<u32>, + cbs: Option<Rc<RefCell<CubebServerCallbacks>>>, + devidmap: DevIdMap, + shm_area_size: usize, +} + +impl rpc::Server for CubebServer { + type Request = ServerMessage; + type Response = ClientMessage; + type Future = FutureResult<Self::Response, ()>; + type Transport = + Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>; + + fn process(&mut self, req: Self::Request) -> Self::Future { + if let ServerMessage::ClientConnect(pid) = req { + self.remote_pid = Some(pid); + } + let resp = with_local_context(|context, manager| match *context { + Err(_) => error(cubeb::Error::error()), + Ok(ref context) => self.process_msg(context, manager, &req), + }); + future::ok(resp) + } +} + +// Debugging for BMO 1594216/1612044. +macro_rules! try_stream { + ($self:expr, $stm_tok:expr) => { + if $self.streams.contains($stm_tok) { + $self.streams[$stm_tok] + .stream + .as_mut() + .expect("uninitialized stream") + } else { + error!( + "{}:{}:{} - Stream({}): invalid token", + file!(), + line!(), + column!(), + $stm_tok + ); + return error(cubeb::Error::invalid_parameter()); + } + }; +} + +impl CubebServer { + pub fn new(callback_thread_handle: current_thread::Handle, shm_area_size: usize) -> Self { + CubebServer { + callback_thread: callback_thread_handle, + streams: slab::Slab::<ServerStream>::new(), + remote_pid: None, + cbs: None, + devidmap: DevIdMap::new(), + shm_area_size, + } + } + + // Process a request coming from the client. + fn process_msg( + &mut self, + context: &cubeb::Context, + manager: &mut CubebDeviceCollectionManager, + msg: &ServerMessage, + ) -> ClientMessage { + let resp: ClientMessage = match *msg { + ServerMessage::ClientConnect(_) => { + // remote_pid is set before cubeb initialization, just verify here. + assert!(self.remote_pid.is_some()); + ClientMessage::ClientConnected + } + + ServerMessage::ClientDisconnect => { + // TODO: + //self.connection.client_disconnect(); + ClientMessage::ClientDisconnected + } + + ServerMessage::ContextGetBackendId => { + ClientMessage::ContextBackendId(context.backend_id().to_string()) + } + + ServerMessage::ContextGetMaxChannelCount => context + .max_channel_count() + .map(ClientMessage::ContextMaxChannelCount) + .unwrap_or_else(error), + + ServerMessage::ContextGetMinLatency(ref params) => { + let format = cubeb::SampleFormat::from(params.format); + let layout = cubeb::ChannelLayout::from(params.layout); + + let params = cubeb::StreamParamsBuilder::new() + .format(format) + .rate(params.rate) + .channels(params.channels) + .layout(layout) + .take(); + + context + .min_latency(¶ms) + .map(ClientMessage::ContextMinLatency) + .unwrap_or_else(error) + } + + ServerMessage::ContextGetPreferredSampleRate => context + .preferred_sample_rate() + .map(ClientMessage::ContextPreferredSampleRate) + .unwrap_or_else(error), + + ServerMessage::ContextGetDeviceEnumeration(device_type) => context + .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type)) + .map(|devices| { + let v: Vec<DeviceInfo> = devices + .iter() + .map(|i| { + let mut tmp: DeviceInfo = i.as_ref().into(); + // Replace each cubeb_devid with a unique handle suitable for IPC. + tmp.devid = self.devidmap.make_handle(tmp.devid); + tmp + }) + .collect(); + ClientMessage::ContextEnumeratedDevices(v) + }) + .unwrap_or_else(error), + + ServerMessage::StreamCreate(ref params) => self + .process_stream_create(params) + .unwrap_or_else(|_| error(cubeb::Error::error())), + + ServerMessage::StreamInit(stm_tok, ref params) => self + .process_stream_init(context, stm_tok, params) + .unwrap_or_else(|_| error(cubeb::Error::error())), + + ServerMessage::StreamDestroy(stm_tok) => { + if self.streams.contains(stm_tok) { + debug!("Unregistering stream {:?}", stm_tok); + self.streams.remove(stm_tok); + } else { + // Debugging for BMO 1594216/1612044. + error!("StreamDestroy({}): invalid token", stm_tok); + return error(cubeb::Error::invalid_parameter()); + } + ClientMessage::StreamDestroyed + } + + ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok) + .start() + .map(|_| ClientMessage::StreamStarted) + .unwrap_or_else(error), + + ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok) + .stop() + .map(|_| ClientMessage::StreamStopped) + .unwrap_or_else(error), + + ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok) + .position() + .map(ClientMessage::StreamPosition) + .unwrap_or_else(error), + + ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok) + .latency() + .map(ClientMessage::StreamLatency) + .unwrap_or_else(error), + + ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok) + .input_latency() + .map(ClientMessage::StreamInputLatency) + .unwrap_or_else(error), + + ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok) + .set_volume(volume) + .map(|_| ClientMessage::StreamVolumeSet) + .unwrap_or_else(error), + + ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok) + .set_name(name) + .map(|_| ClientMessage::StreamNameSet) + .unwrap_or_else(error), + + ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok) + .current_device() + .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) + .unwrap_or_else(error), + + ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => { + try_stream!(self, stm_tok) + .register_device_changed_callback(if enable { + Some(device_change_cb_c) + } else { + None + }) + .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback) + .unwrap_or_else(error) + } + + ServerMessage::ContextSetupDeviceCollectionCallback => { + if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() { + debug!( + "Created device collection RPC pair: {:?}-{:?}", + ipc_server, ipc_client + ); + + // This code is currently running on the Client/Server RPC + // handling thread. We need to move the registration of the + // bind_client to the callback RPC handling thread. This is + // done by spawning a future on `handle`. + let (tx, rx) = oneshot::channel(); + self.callback_thread + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = ipc_server.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::<DeviceCollectionClient>(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionClient"); + + if let Ok(rpc) = rx.wait() { + self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks { + rpc, + devtype: cubeb::DeviceType::empty(), + }))); + let fd = RegisterDeviceCollectionChanged { + platform_handle: SerializableHandle::new( + PlatformHandle::from(ipc_client), + self.remote_pid.unwrap(), + ), + }; + + ClientMessage::ContextSetupDeviceCollectionCallback(fd) + } else { + warn!("Failed to setup RPC client"); + error(cubeb::Error::error()) + } + } else { + warn!("Failed to create RPC pair"); + error(cubeb::Error::error()) + } + } + + ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self + .process_register_device_collection_changed( + context, + manager, + cubeb::DeviceType::from_bits_truncate(device_type), + enable, + ) + .unwrap_or_else(error), + + #[cfg(target_os = "linux")] + ServerMessage::PromoteThreadToRealTime(thread_info) => { + let info = RtPriorityThreadInfo::deserialize(thread_info); + match promote_thread_to_real_time(info, 0, 48000) { + Ok(_) => { + info!("Promotion of content process thread to real-time OK"); + } + Err(_) => { + warn!("Promotion of content process thread to real-time error"); + } + } + ClientMessage::ThreadPromoted + } + }; + + trace!("process_msg: req={:?}, resp={:?}", msg, resp); + + resp + } + + fn process_register_device_collection_changed( + &mut self, + context: &cubeb::Context, + manager: &mut CubebDeviceCollectionManager, + devtype: cubeb::DeviceType, + enable: bool, + ) -> cubeb::Result<ClientMessage> { + if devtype == cubeb::DeviceType::UNKNOWN { + return Err(cubeb::Error::invalid_parameter()); + } + + assert!(self.cbs.is_some()); + let cbs = self.cbs.as_ref().unwrap(); + + if enable { + manager.register(context, cbs, devtype) + } else { + manager.unregister(context, cbs, devtype) + } + .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged) + } + + // Stream create is special, so it's been separated from process_msg. + fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> { + fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 { + params + .map(|p| { + let format = p.format.into(); + let sample_size = match format { + cubeb::SampleFormat::S16LE + | cubeb::SampleFormat::S16BE + | cubeb::SampleFormat::S16NE => 2, + cubeb::SampleFormat::Float32LE + | cubeb::SampleFormat::Float32BE + | cubeb::SampleFormat::Float32NE => 4, + }; + let channel_count = p.channels as u16; + sample_size * channel_count + }) + .unwrap_or(0u16) + } + + // Create the callback handling struct which is attached the cubeb stream. + let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref()); + let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref()); + + let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?; + debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client); + + // Estimate a safe shmem size for this stream configuration. If the server was configured with a fixed + // shm_area_size override, use that instead. + // TODO: Add a new cubeb API to query the precise buffer size required for a given stream config. + // https://github.com/mozilla/audioipc-2/issues/124 + let shm_area_size = if self.shm_area_size == 0 { + let frame_size = output_frame_size.max(input_frame_size) as u32; + let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0); + let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0); + let rate = out_rate.max(in_rate); + // 1s of audio, rounded up to the nearest 64kB. + (((rate * frame_size) + 0xffff) & !0xffff) as usize + } else { + self.shm_area_size + }; + debug!("shm_area_size = {}", shm_area_size); + + let shm = SharedMem::new(&get_shm_id(), shm_area_size)?; + + // This code is currently running on the Client/Server RPC + // handling thread. We need to move the registration of the + // bind_client to the callback RPC handling thread. This is + // done by spawning a future on `handle`. + let (tx, rx) = oneshot::channel(); + self.callback_thread + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = ipc_server.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::<CallbackClient>(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn CallbackClient"); + + let rpc = match rx.wait() { + Ok(rpc) => rpc, + Err(_) => bail!("Failed to create callback rpc."), + }; + + let shm_handle = unsafe { shm.make_handle().unwrap() }; + let shm_setup = Some(rpc.call(CallbackReq::SharedMem( + SerializableHandle::new(shm_handle, self.remote_pid.unwrap()), + shm_area_size, + ))); + + let cbs = Box::new(ServerStreamCallbacks { + input_frame_size, + output_frame_size, + shm, + rpc, + }); + + let entry = self.streams.vacant_entry(); + let key = entry.key(); + debug!("Registering stream {:?}", key); + + entry.insert(ServerStream { + stream: None, + shm_setup, + cbs, + }); + + Ok(ClientMessage::StreamCreated(StreamCreate { + token: key, + platform_handle: SerializableHandle::new( + PlatformHandle::from(ipc_client), + self.remote_pid.unwrap(), + ), + })) + } + + // Stream init is special, so it's been separated from process_msg. + fn process_stream_init( + &mut self, + context: &cubeb::Context, + stm_tok: usize, + params: &StreamInitParams, + ) -> Result<ClientMessage> { + // Create cubeb stream from params + let stream_name = params + .stream_name + .as_ref() + .and_then(|name| CStr::from_bytes_with_nul(name).ok()); + + // Map IPC handle back to cubeb_devid. + let input_device = self.devidmap.handle_to_id(params.input_device) as *const _; + let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe { + cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _) + }); + + // Map IPC handle back to cubeb_devid. + let output_device = self.devidmap.handle_to_id(params.output_device) as *const _; + let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe { + cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _) + }); + + let latency = params.latency_frames; + + let server_stream = &mut self.streams[stm_tok]; + assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>()); + let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void; + + // SharedMem setup message should've been processed by client by now. + match server_stream.shm_setup.take().wait() { + Ok(Some(CallbackResp::SharedMem)) => {} + Ok(Some(CallbackResp::Error(e))) => { + // If the client replied with an error (e.g. client OOM), log error and fail stream init. + debug!( + "Shmem setup for stream {:?} failed (raw error {:?})", + stm_tok, e + ); + return Ok(ClientMessage::Error(e)); + } + Ok(r) => { + debug!( + "Shmem setup for stream {:?} failed (unexpected response {:?})", + stm_tok, r + ); + return Ok(error(cubeb::Error::error())); + } + Err(e) => { + // If the client errored before responding, log error and fail stream init. + debug!( + "Shmem setup for stream {:?} failed (error {:?})", + stm_tok, e + ); + return Err(e.into()); + } + } + + let stream = unsafe { + let stream = context.stream_init( + stream_name, + input_device, + input_stream_params, + output_device, + output_stream_params, + latency, + Some(data_cb_c), + Some(state_cb_c), + user_ptr, + ); + match stream { + Ok(stream) => stream, + Err(e) => { + debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e); + self.streams.remove(stm_tok); + return Err(e.into()); + } + } + }; + + server_stream.stream = Some(stream); + + Ok(ClientMessage::StreamInitialized) + } +} + +// C callable callbacks +unsafe extern "C" fn data_cb_c( + _: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + input_buffer: *const c_void, + output_buffer: *mut c_void, + nframes: c_long, +) -> c_long { + let ok = panic::catch_unwind(|| { + let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); + let input = if input_buffer.is_null() { + &[] + } else { + let nbytes = nframes * c_long::from(cbs.input_frame_size); + slice::from_raw_parts(input_buffer as *const u8, nbytes as usize) + }; + let output: &mut [u8] = if output_buffer.is_null() { + &mut [] + } else { + let nbytes = nframes * c_long::from(cbs.output_frame_size); + slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize) + }; + cbs.data_callback(input, output, nframes as isize) as c_long + }); + // TODO: Return a CUBEB_ERROR result here once + // https://github.com/kinetiknz/cubeb/issues/553 is fixed. + ok.unwrap_or(0) +} + +unsafe extern "C" fn state_cb_c( + _: *mut ffi::cubeb_stream, + user_ptr: *mut c_void, + state: ffi::cubeb_state, +) { + let ok = panic::catch_unwind(|| { + let state = cubeb::State::from(state); + let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); + cbs.state_callback(state); + }); + ok.expect("State callback panicked"); +} + +unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) { + let ok = panic::catch_unwind(|| { + let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); + cbs.device_change_callback(); + }); + ok.expect("Device change callback panicked"); +} + +unsafe extern "C" fn device_collection_changed_input_cb_c( + _: *mut ffi::cubeb, + user_ptr: *mut c_void, +) { + let ok = panic::catch_unwind(|| { + let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); + manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT); + }); + ok.expect("Collection changed (input) callback panicked"); +} + +unsafe extern "C" fn device_collection_changed_output_cb_c( + _: *mut ffi::cubeb, + user_ptr: *mut c_void, +) { + let ok = panic::catch_unwind(|| { + let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager); + manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT); + }); + ok.expect("Collection changed (output) callback panicked"); +} |