summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc-server/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/audioipc-server/src')
-rw-r--r--third_party/rust/audioipc-server/src/lib.rs208
-rw-r--r--third_party/rust/audioipc-server/src/server.rs904
2 files changed, 1112 insertions, 0 deletions
diff --git a/third_party/rust/audioipc-server/src/lib.rs b/third_party/rust/audioipc-server/src/lib.rs
new file mode 100644
index 0000000000..a6ef02f87a
--- /dev/null
+++ b/third_party/rust/audioipc-server/src/lib.rs
@@ -0,0 +1,208 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+#![warn(unused_extern_crates)]
+
+#[macro_use]
+extern crate error_chain;
+#[macro_use]
+extern crate log;
+
+use audio_thread_priority::promote_current_thread_to_real_time;
+use audioipc::core;
+use audioipc::framing::framed;
+use audioipc::rpc;
+use audioipc::{MessageStream, PlatformHandle, PlatformHandleType};
+use futures::sync::oneshot;
+use futures::Future;
+use once_cell::sync::Lazy;
+use std::ffi::{CStr, CString};
+use std::os::raw::c_void;
+use std::ptr;
+use std::sync::Mutex;
+use tokio::reactor;
+
+mod server;
+
+struct CubebContextParams {
+ context_name: CString,
+ backend_name: Option<CString>,
+}
+
+static G_CUBEB_CONTEXT_PARAMS: Lazy<Mutex<CubebContextParams>> = Lazy::new(|| {
+ Mutex::new(CubebContextParams {
+ context_name: CString::new("AudioIPC Server").unwrap(),
+ backend_name: None,
+ })
+});
+
+#[allow(deprecated)]
+pub mod errors {
+ #![allow(clippy::upper_case_acronyms)]
+ error_chain! {
+ links {
+ AudioIPC(::audioipc::errors::Error, ::audioipc::errors::ErrorKind);
+ }
+ foreign_links {
+ Cubeb(cubeb_core::Error);
+ Io(::std::io::Error);
+ Canceled(::futures::sync::oneshot::Canceled);
+ }
+ }
+}
+
+use crate::errors::*;
+
+struct ServerWrapper {
+ core_thread: core::CoreThread,
+ callback_thread: core::CoreThread,
+}
+
+fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
+ if let Some(func) = callback {
+ let thr = std::thread::current();
+ let name = CString::new(thr.name().unwrap()).unwrap();
+ func(name.as_ptr());
+ }
+}
+
+fn unregister_thread(callback: Option<extern "C" fn()>) {
+ if let Some(func) = callback {
+ func();
+ }
+}
+
+fn init_threads(
+ thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+ thread_destroy_callback: Option<extern "C" fn()>,
+) -> Result<ServerWrapper> {
+ trace!("Starting up cubeb audio server event loop thread...");
+
+ let callback_thread = core::spawn_thread(
+ "AudioIPC Callback RPC",
+ move || {
+ match promote_current_thread_to_real_time(0, 48000) {
+ Ok(_) => {}
+ Err(_) => {
+ debug!("Failed to promote audio callback thread to real-time.");
+ }
+ }
+ register_thread(thread_create_callback);
+ trace!("Starting up cubeb audio callback event loop thread...");
+ Ok(())
+ },
+ move || {
+ unregister_thread(thread_destroy_callback);
+ },
+ )
+ .map_err(|e| {
+ debug!(
+ "Failed to start cubeb audio callback event loop thread: {:?}",
+ e
+ );
+ e
+ })?;
+
+ let core_thread = core::spawn_thread(
+ "AudioIPC Server RPC",
+ move || {
+ register_thread(thread_create_callback);
+ audioipc::server_platform_init();
+ Ok(())
+ },
+ move || {
+ unregister_thread(thread_destroy_callback);
+ },
+ )
+ .map_err(|e| {
+ debug!("Failed to cubeb audio core event loop thread: {:?}", e);
+ e
+ })?;
+
+ Ok(ServerWrapper {
+ core_thread,
+ callback_thread,
+ })
+}
+
+#[repr(C)]
+#[derive(Clone, Copy, Debug)]
+pub struct AudioIpcServerInitParams {
+ // Fields only need to be public for ipctest.
+ pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+ pub thread_destroy_callback: Option<extern "C" fn()>,
+}
+
+#[allow(clippy::missing_safety_doc)]
+#[no_mangle]
+pub unsafe extern "C" fn audioipc_server_start(
+ context_name: *const std::os::raw::c_char,
+ backend_name: *const std::os::raw::c_char,
+ init_params: *const AudioIpcServerInitParams,
+) -> *mut c_void {
+ assert!(!init_params.is_null());
+ let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
+ if !context_name.is_null() {
+ params.context_name = CStr::from_ptr(context_name).to_owned();
+ }
+ if !backend_name.is_null() {
+ let backend_string = CStr::from_ptr(backend_name).to_owned();
+ params.backend_name = Some(backend_string);
+ }
+ match init_threads(
+ (*init_params).thread_create_callback,
+ (*init_params).thread_destroy_callback,
+ ) {
+ Ok(server) => Box::into_raw(Box::new(server)) as *mut _,
+ Err(_) => ptr::null_mut() as *mut _,
+ }
+}
+
+// A `shm_area_size` of 0 allows the server to calculate an appropriate shm size for each stream.
+// A non-zero `shm_area_size` forces all allocations to the specified size.
+#[no_mangle]
+pub extern "C" fn audioipc_server_new_client(
+ p: *mut c_void,
+ shm_area_size: usize,
+) -> PlatformHandleType {
+ let (wait_tx, wait_rx) = oneshot::channel();
+ let wrapper: &ServerWrapper = unsafe { &*(p as *mut _) };
+
+ let callback_thread_handle = wrapper.callback_thread.handle();
+
+ // We create a connected pair of anonymous IPC endpoints. One side
+ // is registered with the reactor core, the other side is returned
+ // to the caller.
+ MessageStream::anonymous_ipc_pair()
+ .map(|(ipc_server, ipc_client)| {
+ // Spawn closure to run on same thread as reactor::Core
+ // via remote handle.
+ wrapper
+ .core_thread
+ .handle()
+ .spawn(futures::future::lazy(move || {
+ trace!("Incoming connection");
+ let handle = reactor::Handle::default();
+ ipc_server.into_tokio_ipc(&handle)
+ .map(|sock| {
+ let transport = framed(sock, Default::default());
+ rpc::bind_server(transport, server::CubebServer::new(callback_thread_handle, shm_area_size));
+ }).map_err(|_| ())
+ // Notify waiting thread that server has been registered.
+ .and_then(|_| wait_tx.send(()))
+ }))
+ .expect("Failed to spawn CubebServer");
+ // Wait for notification that server has been registered
+ // with reactor::Core.
+ let _ = wait_rx.wait();
+ unsafe { PlatformHandle::from(ipc_client).into_raw() }
+ })
+ .unwrap_or(audioipc::INVALID_HANDLE_VALUE)
+}
+
+#[no_mangle]
+pub extern "C" fn audioipc_server_stop(p: *mut c_void) {
+ let wrapper = unsafe { Box::<ServerWrapper>::from_raw(p as *mut _) };
+ drop(wrapper);
+}
diff --git a/third_party/rust/audioipc-server/src/server.rs b/third_party/rust/audioipc-server/src/server.rs
new file mode 100644
index 0000000000..c4b1f62327
--- /dev/null
+++ b/third_party/rust/audioipc-server/src/server.rs
@@ -0,0 +1,904 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+#[cfg(target_os = "linux")]
+use audio_thread_priority::{promote_thread_to_real_time, RtPriorityThreadInfo};
+use audioipc::framing::{framed, Framed};
+use audioipc::messages::{
+ CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp,
+ DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamCreateParams,
+ StreamInitParams, StreamParams,
+};
+use audioipc::rpc;
+use audioipc::shm::SharedMem;
+use audioipc::{codec::LengthDelimitedCodec, messages::SerializableHandle};
+use audioipc::{MessageStream, PlatformHandle};
+use cubeb_core as cubeb;
+use cubeb_core::ffi;
+use futures::future::{self, FutureResult};
+use futures::sync::oneshot;
+use futures::Future;
+use std::convert::From;
+use std::ffi::CStr;
+use std::mem::size_of;
+use std::os::raw::{c_long, c_void};
+use std::rc::Rc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::{cell::RefCell, sync::Mutex};
+use std::{panic, slice};
+use tokio::reactor;
+use tokio::runtime::current_thread;
+
+use crate::errors::*;
+
+fn error(error: cubeb::Error) -> ClientMessage {
+ ClientMessage::Error(error.raw_code())
+}
+
+struct CubebDeviceCollectionManager {
+ servers: Mutex<Vec<Rc<RefCell<CubebServerCallbacks>>>>,
+}
+
+impl CubebDeviceCollectionManager {
+ fn new() -> CubebDeviceCollectionManager {
+ CubebDeviceCollectionManager {
+ servers: Mutex::new(Vec::new()),
+ }
+ }
+
+ fn register(
+ &mut self,
+ context: &cubeb::Context,
+ server: &Rc<RefCell<CubebServerCallbacks>>,
+ devtype: cubeb::DeviceType,
+ ) -> cubeb::Result<()> {
+ let mut servers = self.servers.lock().unwrap();
+ if servers.is_empty() {
+ self.internal_register(context, true)?;
+ }
+ server.borrow_mut().devtype.insert(devtype);
+ if !servers.iter().any(|s| Rc::ptr_eq(s, server)) {
+ servers.push(server.clone());
+ }
+ Ok(())
+ }
+
+ fn unregister(
+ &mut self,
+ context: &cubeb::Context,
+ server: &Rc<RefCell<CubebServerCallbacks>>,
+ devtype: cubeb::DeviceType,
+ ) -> cubeb::Result<()> {
+ let mut servers = self.servers.lock().unwrap();
+ server.borrow_mut().devtype.remove(devtype);
+ if server.borrow().devtype.is_empty() {
+ servers.retain(|s| !Rc::ptr_eq(s, server));
+ }
+ if servers.is_empty() {
+ self.internal_register(context, false)?;
+ }
+ Ok(())
+ }
+
+ fn internal_register(&self, context: &cubeb::Context, enable: bool) -> cubeb::Result<()> {
+ let user_ptr = if enable {
+ self as *const CubebDeviceCollectionManager as *mut c_void
+ } else {
+ std::ptr::null_mut()
+ };
+ for &(dir, cb) in &[
+ (
+ cubeb::DeviceType::INPUT,
+ device_collection_changed_input_cb_c as _,
+ ),
+ (
+ cubeb::DeviceType::OUTPUT,
+ device_collection_changed_output_cb_c as _,
+ ),
+ ] {
+ unsafe {
+ context.register_device_collection_changed(
+ dir,
+ if enable { Some(cb) } else { None },
+ user_ptr,
+ )?;
+ }
+ }
+ Ok(())
+ }
+
+ // Warning: this is called from an internal cubeb thread, so we must not mutate unprotected shared state.
+ unsafe fn device_collection_changed_callback(&self, device_type: ffi::cubeb_device_type) {
+ let servers = self.servers.lock().unwrap();
+ servers.iter().for_each(|server| {
+ if server
+ .borrow()
+ .devtype
+ .contains(cubeb::DeviceType::from_bits_truncate(device_type))
+ {
+ server
+ .borrow_mut()
+ .device_collection_changed_callback(device_type)
+ }
+ });
+ }
+}
+
+struct DevIdMap {
+ devices: Vec<usize>,
+}
+
+// A cubeb_devid is an opaque type which may be implemented with a stable
+// pointer in a cubeb backend. cubeb_devids received remotely must be
+// validated before use, so DevIdMap provides a simple 1:1 mapping between a
+// cubeb_devid and an IPC-transportable value suitable for use as a unique
+// handle.
+impl DevIdMap {
+ fn new() -> DevIdMap {
+ let mut d = DevIdMap {
+ devices: Vec::with_capacity(32),
+ };
+ // A null cubeb_devid is used for selecting the default device.
+ // Pre-populate the mapping with 0 -> 0 to handle nulls.
+ d.devices.push(0);
+ d
+ }
+
+ // Given a cubeb_devid, return a unique stable value suitable for use
+ // over IPC.
+ fn make_handle(&mut self, devid: usize) -> usize {
+ if let Some(i) = self.devices.iter().position(|&d| d == devid) {
+ return i;
+ }
+ self.devices.push(devid);
+ self.devices.len() - 1
+ }
+
+ // Given a handle produced by `make_handle`, return the associated
+ // cubeb_devid. Invalid handles result in a panic.
+ fn handle_to_id(&self, handle: usize) -> usize {
+ self.devices[handle]
+ }
+}
+
+struct CubebContextState {
+ context: cubeb::Result<cubeb::Context>,
+ manager: CubebDeviceCollectionManager,
+}
+
+thread_local!(static CONTEXT_KEY: RefCell<Option<CubebContextState>> = RefCell::new(None));
+
+fn cubeb_init_from_context_params() -> cubeb::Result<cubeb::Context> {
+ let params = super::G_CUBEB_CONTEXT_PARAMS.lock().unwrap();
+ let context_name = Some(params.context_name.as_c_str());
+ let backend_name = params.backend_name.as_deref();
+ let r = cubeb::Context::init(context_name, backend_name);
+ r.map_err(|e| {
+ info!("cubeb::Context::init failed r={:?}", e);
+ e
+ })
+}
+
+fn with_local_context<T, F>(f: F) -> T
+where
+ F: FnOnce(&cubeb::Result<cubeb::Context>, &mut CubebDeviceCollectionManager) -> T,
+{
+ CONTEXT_KEY.with(|k| {
+ let mut state = k.borrow_mut();
+ if state.is_none() {
+ *state = Some(CubebContextState {
+ context: cubeb_init_from_context_params(),
+ manager: CubebDeviceCollectionManager::new(),
+ });
+ }
+ let CubebContextState { context, manager } = state.as_mut().unwrap();
+ // Always reattempt to initialize cubeb, OS config may have changed.
+ if context.is_err() {
+ *context = cubeb_init_from_context_params();
+ }
+ f(context, manager)
+ })
+}
+
+struct DeviceCollectionClient;
+
+impl rpc::Client for DeviceCollectionClient {
+ type Request = DeviceCollectionReq;
+ type Response = DeviceCollectionResp;
+ type Transport =
+ Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+}
+
+struct CallbackClient;
+
+impl rpc::Client for CallbackClient {
+ type Request = CallbackReq;
+ type Response = CallbackResp;
+ type Transport =
+ Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
+}
+
+struct ServerStreamCallbacks {
+ /// Size of input frame in bytes
+ input_frame_size: u16,
+ /// Size of output frame in bytes
+ output_frame_size: u16,
+ /// Shared memory buffer for transporting audio data to/from client
+ shm: SharedMem,
+ /// RPC interface to callback server running in client
+ rpc: rpc::ClientProxy<CallbackReq, CallbackResp>,
+}
+
+impl ServerStreamCallbacks {
+ fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize {
+ trace!(
+ "Stream data callback: {} {} {}",
+ nframes,
+ input.len(),
+ output.len()
+ );
+
+ unsafe {
+ if self.input_frame_size != 0 {
+ self.shm
+ .get_mut_slice(input.len())
+ .unwrap()
+ .copy_from_slice(input);
+ }
+ }
+
+ let r = self
+ .rpc
+ .call(CallbackReq::Data {
+ nframes,
+ input_frame_size: self.input_frame_size as usize,
+ output_frame_size: self.output_frame_size as usize,
+ })
+ .wait();
+
+ match r {
+ Ok(CallbackResp::Data(frames)) => {
+ if frames >= 0 {
+ let nbytes = frames as usize * self.output_frame_size as usize;
+ trace!("Reslice output to {}", nbytes);
+ unsafe {
+ if self.output_frame_size != 0 {
+ output[..nbytes].copy_from_slice(self.shm.get_slice(nbytes).unwrap());
+ }
+ }
+ }
+ frames
+ }
+ _ => {
+ debug!("Unexpected message {:?} during data_callback", r);
+ // TODO: Return a CUBEB_ERROR result here once
+ // https://github.com/kinetiknz/cubeb/issues/553 is
+ // fixed.
+ 0
+ }
+ }
+ }
+
+ fn state_callback(&mut self, state: cubeb::State) {
+ trace!("Stream state callback: {:?}", state);
+ let r = self.rpc.call(CallbackReq::State(state.into())).wait();
+ match r {
+ Ok(CallbackResp::State) => {}
+ _ => {
+ debug!("Unexpected message {:?} during state callback", r);
+ }
+ }
+ }
+
+ fn device_change_callback(&mut self) {
+ trace!("Stream device change callback");
+ let r = self.rpc.call(CallbackReq::DeviceChange).wait();
+ match r {
+ Ok(CallbackResp::DeviceChange) => {}
+ _ => {
+ debug!("Unexpected message {:?} during device change callback", r);
+ }
+ }
+ }
+}
+
+static SHM_ID: AtomicUsize = AtomicUsize::new(0);
+
+// Generate a temporary shm_id fragment that is unique to the process. This
+// path is used temporarily to create a shm segment, which is then
+// immediately deleted from the filesystem while retaining handles to the
+// shm to be shared between the server and client.
+fn get_shm_id() -> String {
+ format!(
+ "cubeb-shm-{}-{}",
+ std::process::id(),
+ SHM_ID.fetch_add(1, Ordering::SeqCst)
+ )
+}
+
+struct ServerStream {
+ stream: Option<cubeb::Stream>,
+ cbs: Box<ServerStreamCallbacks>,
+ shm_setup: Option<rpc::Response<CallbackResp>>,
+}
+
+impl Drop for ServerStream {
+ fn drop(&mut self) {
+ // `stream` *must* be dropped before `cbs`.
+ drop(self.stream.take());
+ }
+}
+
+struct CubebServerCallbacks {
+ rpc: rpc::ClientProxy<DeviceCollectionReq, DeviceCollectionResp>,
+ devtype: cubeb::DeviceType,
+}
+
+impl CubebServerCallbacks {
+ fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) {
+ // TODO: Assert device_type is in devtype.
+ debug!(
+ "Sending device collection ({:?}) changed event",
+ device_type
+ );
+ let _ = self
+ .rpc
+ .call(DeviceCollectionReq::DeviceChange(device_type))
+ .wait();
+ }
+}
+
+pub struct CubebServer {
+ callback_thread: current_thread::Handle,
+ streams: slab::Slab<ServerStream>,
+ remote_pid: Option<u32>,
+ cbs: Option<Rc<RefCell<CubebServerCallbacks>>>,
+ devidmap: DevIdMap,
+ shm_area_size: usize,
+}
+
+impl rpc::Server for CubebServer {
+ type Request = ServerMessage;
+ type Response = ClientMessage;
+ type Future = FutureResult<Self::Response, ()>;
+ type Transport =
+ Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+ fn process(&mut self, req: Self::Request) -> Self::Future {
+ if let ServerMessage::ClientConnect(pid) = req {
+ self.remote_pid = Some(pid);
+ }
+ let resp = with_local_context(|context, manager| match *context {
+ Err(_) => error(cubeb::Error::error()),
+ Ok(ref context) => self.process_msg(context, manager, &req),
+ });
+ future::ok(resp)
+ }
+}
+
+// Debugging for BMO 1594216/1612044.
+macro_rules! try_stream {
+ ($self:expr, $stm_tok:expr) => {
+ if $self.streams.contains($stm_tok) {
+ $self.streams[$stm_tok]
+ .stream
+ .as_mut()
+ .expect("uninitialized stream")
+ } else {
+ error!(
+ "{}:{}:{} - Stream({}): invalid token",
+ file!(),
+ line!(),
+ column!(),
+ $stm_tok
+ );
+ return error(cubeb::Error::invalid_parameter());
+ }
+ };
+}
+
+impl CubebServer {
+ pub fn new(callback_thread_handle: current_thread::Handle, shm_area_size: usize) -> Self {
+ CubebServer {
+ callback_thread: callback_thread_handle,
+ streams: slab::Slab::<ServerStream>::new(),
+ remote_pid: None,
+ cbs: None,
+ devidmap: DevIdMap::new(),
+ shm_area_size,
+ }
+ }
+
+ // Process a request coming from the client.
+ fn process_msg(
+ &mut self,
+ context: &cubeb::Context,
+ manager: &mut CubebDeviceCollectionManager,
+ msg: &ServerMessage,
+ ) -> ClientMessage {
+ let resp: ClientMessage = match *msg {
+ ServerMessage::ClientConnect(_) => {
+ // remote_pid is set before cubeb initialization, just verify here.
+ assert!(self.remote_pid.is_some());
+ ClientMessage::ClientConnected
+ }
+
+ ServerMessage::ClientDisconnect => {
+ // TODO:
+ //self.connection.client_disconnect();
+ ClientMessage::ClientDisconnected
+ }
+
+ ServerMessage::ContextGetBackendId => {
+ ClientMessage::ContextBackendId(context.backend_id().to_string())
+ }
+
+ ServerMessage::ContextGetMaxChannelCount => context
+ .max_channel_count()
+ .map(ClientMessage::ContextMaxChannelCount)
+ .unwrap_or_else(error),
+
+ ServerMessage::ContextGetMinLatency(ref params) => {
+ let format = cubeb::SampleFormat::from(params.format);
+ let layout = cubeb::ChannelLayout::from(params.layout);
+
+ let params = cubeb::StreamParamsBuilder::new()
+ .format(format)
+ .rate(params.rate)
+ .channels(params.channels)
+ .layout(layout)
+ .take();
+
+ context
+ .min_latency(&params)
+ .map(ClientMessage::ContextMinLatency)
+ .unwrap_or_else(error)
+ }
+
+ ServerMessage::ContextGetPreferredSampleRate => context
+ .preferred_sample_rate()
+ .map(ClientMessage::ContextPreferredSampleRate)
+ .unwrap_or_else(error),
+
+ ServerMessage::ContextGetDeviceEnumeration(device_type) => context
+ .enumerate_devices(cubeb::DeviceType::from_bits_truncate(device_type))
+ .map(|devices| {
+ let v: Vec<DeviceInfo> = devices
+ .iter()
+ .map(|i| {
+ let mut tmp: DeviceInfo = i.as_ref().into();
+ // Replace each cubeb_devid with a unique handle suitable for IPC.
+ tmp.devid = self.devidmap.make_handle(tmp.devid);
+ tmp
+ })
+ .collect();
+ ClientMessage::ContextEnumeratedDevices(v)
+ })
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamCreate(ref params) => self
+ .process_stream_create(params)
+ .unwrap_or_else(|_| error(cubeb::Error::error())),
+
+ ServerMessage::StreamInit(stm_tok, ref params) => self
+ .process_stream_init(context, stm_tok, params)
+ .unwrap_or_else(|_| error(cubeb::Error::error())),
+
+ ServerMessage::StreamDestroy(stm_tok) => {
+ if self.streams.contains(stm_tok) {
+ debug!("Unregistering stream {:?}", stm_tok);
+ self.streams.remove(stm_tok);
+ } else {
+ // Debugging for BMO 1594216/1612044.
+ error!("StreamDestroy({}): invalid token", stm_tok);
+ return error(cubeb::Error::invalid_parameter());
+ }
+ ClientMessage::StreamDestroyed
+ }
+
+ ServerMessage::StreamStart(stm_tok) => try_stream!(self, stm_tok)
+ .start()
+ .map(|_| ClientMessage::StreamStarted)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamStop(stm_tok) => try_stream!(self, stm_tok)
+ .stop()
+ .map(|_| ClientMessage::StreamStopped)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamGetPosition(stm_tok) => try_stream!(self, stm_tok)
+ .position()
+ .map(ClientMessage::StreamPosition)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamGetLatency(stm_tok) => try_stream!(self, stm_tok)
+ .latency()
+ .map(ClientMessage::StreamLatency)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamGetInputLatency(stm_tok) => try_stream!(self, stm_tok)
+ .input_latency()
+ .map(ClientMessage::StreamInputLatency)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamSetVolume(stm_tok, volume) => try_stream!(self, stm_tok)
+ .set_volume(volume)
+ .map(|_| ClientMessage::StreamVolumeSet)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamSetName(stm_tok, ref name) => try_stream!(self, stm_tok)
+ .set_name(name)
+ .map(|_| ClientMessage::StreamNameSet)
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamGetCurrentDevice(stm_tok) => try_stream!(self, stm_tok)
+ .current_device()
+ .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device)))
+ .unwrap_or_else(error),
+
+ ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => {
+ try_stream!(self, stm_tok)
+ .register_device_changed_callback(if enable {
+ Some(device_change_cb_c)
+ } else {
+ None
+ })
+ .map(|_| ClientMessage::StreamRegisterDeviceChangeCallback)
+ .unwrap_or_else(error)
+ }
+
+ ServerMessage::ContextSetupDeviceCollectionCallback => {
+ if let Ok((ipc_server, ipc_client)) = MessageStream::anonymous_ipc_pair() {
+ debug!(
+ "Created device collection RPC pair: {:?}-{:?}",
+ ipc_server, ipc_client
+ );
+
+ // This code is currently running on the Client/Server RPC
+ // handling thread. We need to move the registration of the
+ // bind_client to the callback RPC handling thread. This is
+ // done by spawning a future on `handle`.
+ let (tx, rx) = oneshot::channel();
+ self.callback_thread
+ .spawn(futures::future::lazy(move || {
+ let handle = reactor::Handle::default();
+ let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
+ let transport = framed(stream, Default::default());
+ let rpc = rpc::bind_client::<DeviceCollectionClient>(transport);
+ drop(tx.send(rpc));
+ Ok(())
+ }))
+ .expect("Failed to spawn DeviceCollectionClient");
+
+ if let Ok(rpc) = rx.wait() {
+ self.cbs = Some(Rc::new(RefCell::new(CubebServerCallbacks {
+ rpc,
+ devtype: cubeb::DeviceType::empty(),
+ })));
+ let fd = RegisterDeviceCollectionChanged {
+ platform_handle: SerializableHandle::new(
+ PlatformHandle::from(ipc_client),
+ self.remote_pid.unwrap(),
+ ),
+ };
+
+ ClientMessage::ContextSetupDeviceCollectionCallback(fd)
+ } else {
+ warn!("Failed to setup RPC client");
+ error(cubeb::Error::error())
+ }
+ } else {
+ warn!("Failed to create RPC pair");
+ error(cubeb::Error::error())
+ }
+ }
+
+ ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self
+ .process_register_device_collection_changed(
+ context,
+ manager,
+ cubeb::DeviceType::from_bits_truncate(device_type),
+ enable,
+ )
+ .unwrap_or_else(error),
+
+ #[cfg(target_os = "linux")]
+ ServerMessage::PromoteThreadToRealTime(thread_info) => {
+ let info = RtPriorityThreadInfo::deserialize(thread_info);
+ match promote_thread_to_real_time(info, 0, 48000) {
+ Ok(_) => {
+ info!("Promotion of content process thread to real-time OK");
+ }
+ Err(_) => {
+ warn!("Promotion of content process thread to real-time error");
+ }
+ }
+ ClientMessage::ThreadPromoted
+ }
+ };
+
+ trace!("process_msg: req={:?}, resp={:?}", msg, resp);
+
+ resp
+ }
+
+ fn process_register_device_collection_changed(
+ &mut self,
+ context: &cubeb::Context,
+ manager: &mut CubebDeviceCollectionManager,
+ devtype: cubeb::DeviceType,
+ enable: bool,
+ ) -> cubeb::Result<ClientMessage> {
+ if devtype == cubeb::DeviceType::UNKNOWN {
+ return Err(cubeb::Error::invalid_parameter());
+ }
+
+ assert!(self.cbs.is_some());
+ let cbs = self.cbs.as_ref().unwrap();
+
+ if enable {
+ manager.register(context, cbs, devtype)
+ } else {
+ manager.unregister(context, cbs, devtype)
+ }
+ .map(|_| ClientMessage::ContextRegisteredDeviceCollectionChanged)
+ }
+
+ // Stream create is special, so it's been separated from process_msg.
+ fn process_stream_create(&mut self, params: &StreamCreateParams) -> Result<ClientMessage> {
+ fn frame_size_in_bytes(params: Option<&StreamParams>) -> u16 {
+ params
+ .map(|p| {
+ let format = p.format.into();
+ let sample_size = match format {
+ cubeb::SampleFormat::S16LE
+ | cubeb::SampleFormat::S16BE
+ | cubeb::SampleFormat::S16NE => 2,
+ cubeb::SampleFormat::Float32LE
+ | cubeb::SampleFormat::Float32BE
+ | cubeb::SampleFormat::Float32NE => 4,
+ };
+ let channel_count = p.channels as u16;
+ sample_size * channel_count
+ })
+ .unwrap_or(0u16)
+ }
+
+ // Create the callback handling struct which is attached the cubeb stream.
+ let input_frame_size = frame_size_in_bytes(params.input_stream_params.as_ref());
+ let output_frame_size = frame_size_in_bytes(params.output_stream_params.as_ref());
+
+ let (ipc_server, ipc_client) = MessageStream::anonymous_ipc_pair()?;
+ debug!("Created callback pair: {:?}-{:?}", ipc_server, ipc_client);
+
+ // Estimate a safe shmem size for this stream configuration. If the server was configured with a fixed
+ // shm_area_size override, use that instead.
+ // TODO: Add a new cubeb API to query the precise buffer size required for a given stream config.
+ // https://github.com/mozilla/audioipc-2/issues/124
+ let shm_area_size = if self.shm_area_size == 0 {
+ let frame_size = output_frame_size.max(input_frame_size) as u32;
+ let in_rate = params.input_stream_params.map(|p| p.rate).unwrap_or(0);
+ let out_rate = params.output_stream_params.map(|p| p.rate).unwrap_or(0);
+ let rate = out_rate.max(in_rate);
+ // 1s of audio, rounded up to the nearest 64kB.
+ (((rate * frame_size) + 0xffff) & !0xffff) as usize
+ } else {
+ self.shm_area_size
+ };
+ debug!("shm_area_size = {}", shm_area_size);
+
+ let shm = SharedMem::new(&get_shm_id(), shm_area_size)?;
+
+ // This code is currently running on the Client/Server RPC
+ // handling thread. We need to move the registration of the
+ // bind_client to the callback RPC handling thread. This is
+ // done by spawning a future on `handle`.
+ let (tx, rx) = oneshot::channel();
+ self.callback_thread
+ .spawn(futures::future::lazy(move || {
+ let handle = reactor::Handle::default();
+ let stream = ipc_server.into_tokio_ipc(&handle).unwrap();
+ let transport = framed(stream, Default::default());
+ let rpc = rpc::bind_client::<CallbackClient>(transport);
+ drop(tx.send(rpc));
+ Ok(())
+ }))
+ .expect("Failed to spawn CallbackClient");
+
+ let rpc = match rx.wait() {
+ Ok(rpc) => rpc,
+ Err(_) => bail!("Failed to create callback rpc."),
+ };
+
+ let shm_handle = unsafe { shm.make_handle().unwrap() };
+ let shm_setup = Some(rpc.call(CallbackReq::SharedMem(
+ SerializableHandle::new(shm_handle, self.remote_pid.unwrap()),
+ shm_area_size,
+ )));
+
+ let cbs = Box::new(ServerStreamCallbacks {
+ input_frame_size,
+ output_frame_size,
+ shm,
+ rpc,
+ });
+
+ let entry = self.streams.vacant_entry();
+ let key = entry.key();
+ debug!("Registering stream {:?}", key);
+
+ entry.insert(ServerStream {
+ stream: None,
+ shm_setup,
+ cbs,
+ });
+
+ Ok(ClientMessage::StreamCreated(StreamCreate {
+ token: key,
+ platform_handle: SerializableHandle::new(
+ PlatformHandle::from(ipc_client),
+ self.remote_pid.unwrap(),
+ ),
+ }))
+ }
+
+ // Stream init is special, so it's been separated from process_msg.
+ fn process_stream_init(
+ &mut self,
+ context: &cubeb::Context,
+ stm_tok: usize,
+ params: &StreamInitParams,
+ ) -> Result<ClientMessage> {
+ // Create cubeb stream from params
+ let stream_name = params
+ .stream_name
+ .as_ref()
+ .and_then(|name| CStr::from_bytes_with_nul(name).ok());
+
+ // Map IPC handle back to cubeb_devid.
+ let input_device = self.devidmap.handle_to_id(params.input_device) as *const _;
+ let input_stream_params = params.input_stream_params.as_ref().map(|isp| unsafe {
+ cubeb::StreamParamsRef::from_ptr(isp as *const StreamParams as *mut _)
+ });
+
+ // Map IPC handle back to cubeb_devid.
+ let output_device = self.devidmap.handle_to_id(params.output_device) as *const _;
+ let output_stream_params = params.output_stream_params.as_ref().map(|osp| unsafe {
+ cubeb::StreamParamsRef::from_ptr(osp as *const StreamParams as *mut _)
+ });
+
+ let latency = params.latency_frames;
+
+ let server_stream = &mut self.streams[stm_tok];
+ assert!(size_of::<Box<ServerStreamCallbacks>>() == size_of::<usize>());
+ let user_ptr = server_stream.cbs.as_ref() as *const ServerStreamCallbacks as *mut c_void;
+
+ // SharedMem setup message should've been processed by client by now.
+ match server_stream.shm_setup.take().wait() {
+ Ok(Some(CallbackResp::SharedMem)) => {}
+ Ok(Some(CallbackResp::Error(e))) => {
+ // If the client replied with an error (e.g. client OOM), log error and fail stream init.
+ debug!(
+ "Shmem setup for stream {:?} failed (raw error {:?})",
+ stm_tok, e
+ );
+ return Ok(ClientMessage::Error(e));
+ }
+ Ok(r) => {
+ debug!(
+ "Shmem setup for stream {:?} failed (unexpected response {:?})",
+ stm_tok, r
+ );
+ return Ok(error(cubeb::Error::error()));
+ }
+ Err(e) => {
+ // If the client errored before responding, log error and fail stream init.
+ debug!(
+ "Shmem setup for stream {:?} failed (error {:?})",
+ stm_tok, e
+ );
+ return Err(e.into());
+ }
+ }
+
+ let stream = unsafe {
+ let stream = context.stream_init(
+ stream_name,
+ input_device,
+ input_stream_params,
+ output_device,
+ output_stream_params,
+ latency,
+ Some(data_cb_c),
+ Some(state_cb_c),
+ user_ptr,
+ );
+ match stream {
+ Ok(stream) => stream,
+ Err(e) => {
+ debug!("Unregistering stream {:?} (stream error {:?})", stm_tok, e);
+ self.streams.remove(stm_tok);
+ return Err(e.into());
+ }
+ }
+ };
+
+ server_stream.stream = Some(stream);
+
+ Ok(ClientMessage::StreamInitialized)
+ }
+}
+
+// C callable callbacks
+unsafe extern "C" fn data_cb_c(
+ _: *mut ffi::cubeb_stream,
+ user_ptr: *mut c_void,
+ input_buffer: *const c_void,
+ output_buffer: *mut c_void,
+ nframes: c_long,
+) -> c_long {
+ let ok = panic::catch_unwind(|| {
+ let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
+ let input = if input_buffer.is_null() {
+ &[]
+ } else {
+ let nbytes = nframes * c_long::from(cbs.input_frame_size);
+ slice::from_raw_parts(input_buffer as *const u8, nbytes as usize)
+ };
+ let output: &mut [u8] = if output_buffer.is_null() {
+ &mut []
+ } else {
+ let nbytes = nframes * c_long::from(cbs.output_frame_size);
+ slice::from_raw_parts_mut(output_buffer as *mut u8, nbytes as usize)
+ };
+ cbs.data_callback(input, output, nframes as isize) as c_long
+ });
+ // TODO: Return a CUBEB_ERROR result here once
+ // https://github.com/kinetiknz/cubeb/issues/553 is fixed.
+ ok.unwrap_or(0)
+}
+
+unsafe extern "C" fn state_cb_c(
+ _: *mut ffi::cubeb_stream,
+ user_ptr: *mut c_void,
+ state: ffi::cubeb_state,
+) {
+ let ok = panic::catch_unwind(|| {
+ let state = cubeb::State::from(state);
+ let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
+ cbs.state_callback(state);
+ });
+ ok.expect("State callback panicked");
+}
+
+unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) {
+ let ok = panic::catch_unwind(|| {
+ let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks);
+ cbs.device_change_callback();
+ });
+ ok.expect("Device change callback panicked");
+}
+
+unsafe extern "C" fn device_collection_changed_input_cb_c(
+ _: *mut ffi::cubeb,
+ user_ptr: *mut c_void,
+) {
+ let ok = panic::catch_unwind(|| {
+ let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
+ manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_INPUT);
+ });
+ ok.expect("Collection changed (input) callback panicked");
+}
+
+unsafe extern "C" fn device_collection_changed_output_cb_c(
+ _: *mut ffi::cubeb,
+ user_ptr: *mut c_void,
+) {
+ let ok = panic::catch_unwind(|| {
+ let manager = &mut *(user_ptr as *mut CubebDeviceCollectionManager);
+ manager.device_collection_changed_callback(ffi::CUBEB_DEVICE_TYPE_OUTPUT);
+ });
+ ok.expect("Collection changed (output) callback panicked");
+}