summaryrefslogtreecommitdiffstats
path: root/media/audioipc/client
diff options
context:
space:
mode:
Diffstat (limited to 'media/audioipc/client')
-rw-r--r--media/audioipc/client/Cargo.toml18
-rw-r--r--media/audioipc/client/cbindgen.toml28
-rw-r--r--media/audioipc/client/src/context.rs451
-rw-r--r--media/audioipc/client/src/lib.rs84
-rw-r--r--media/audioipc/client/src/send_recv.rs73
-rw-r--r--media/audioipc/client/src/stream.rs357
6 files changed, 1011 insertions, 0 deletions
diff --git a/media/audioipc/client/Cargo.toml b/media/audioipc/client/Cargo.toml
new file mode 100644
index 0000000000..eff18b1a02
--- /dev/null
+++ b/media/audioipc/client/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "audioipc-client"
+version = "0.4.0"
+authors = [
+ "Matthew Gregan <kinetik@flim.org>",
+ "Dan Glastonbury <dan.glastonbury@gmail.com>"
+ ]
+description = "Cubeb Backend for talking to remote cubeb server."
+edition = "2018"
+
+[dependencies]
+audio_thread_priority = "0.23.4"
+audioipc = { path="../audioipc" }
+cubeb-backend = "0.8"
+futures = { version="0.1.18", default-features=false, features=["use_std"] }
+futures-cpupool = { version="0.1.8", default-features=false }
+log = "0.4"
+tokio = "0.1"
diff --git a/media/audioipc/client/cbindgen.toml b/media/audioipc/client/cbindgen.toml
new file mode 100644
index 0000000000..9a7c204fe4
--- /dev/null
+++ b/media/audioipc/client/cbindgen.toml
@@ -0,0 +1,28 @@
+header = """/* This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */"""
+autogen_warning = """/* DO NOT MODIFY THIS MANUALLY! This file was generated using cbindgen. */"""
+include_version = true
+braces = "SameLine"
+line_length = 100
+tab_width = 2
+language = "C++"
+namespaces = ["mozilla", "audioipc"]
+
+[export]
+item_types = ["globals", "enums", "structs", "unions", "typedefs", "opaque", "functions", "constants"]
+
+[parse]
+parse_deps = true
+include = ['audioipc']
+
+[fn]
+args = "Vertical"
+rename_args = "GeckoCase"
+
+[struct]
+rename_fields = "GeckoCase"
+
+[defines]
+"windows" = "XP_WIN"
+"unix" = "XP_UNIX"
diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs
new file mode 100644
index 0000000000..7441d0c0e0
--- /dev/null
+++ b/media/audioipc/client/src/context.rs
@@ -0,0 +1,451 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::stream;
+use crate::{assert_not_in_callback, run_in_callback};
+use crate::{ClientStream, AUDIOIPC_INIT_PARAMS};
+#[cfg(target_os = "linux")]
+use audio_thread_priority::get_current_thread_info;
+#[cfg(not(target_os = "linux"))]
+use audio_thread_priority::promote_current_thread_to_real_time;
+use audioipc::codec::LengthDelimitedCodec;
+use audioipc::frame::{framed, Framed};
+use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
+use audioipc::{core, rpc};
+use audioipc::{
+ messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage,
+ ServerMessage,
+};
+use cubeb_backend::{
+ ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result,
+ Stream, StreamParams, StreamParamsRef,
+};
+use futures::Future;
+use futures_cpupool::{CpuFuture, CpuPool};
+use std::ffi::{CStr, CString};
+use std::os::raw::c_void;
+use std::sync::mpsc;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::{fmt, io, mem, ptr};
+use tokio::reactor;
+use tokio::runtime::current_thread;
+
+struct CubebClient;
+
+impl rpc::Client for CubebClient {
+ type Request = ServerMessage;
+ type Response = ClientMessage;
+ type Transport = FramedWithPlatformHandles<
+ audioipc::AsyncMessageStream,
+ LengthDelimitedCodec<Self::Request, Self::Response>,
+ >;
+}
+
+pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
+
+// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the
+// common fields.
+#[repr(C)]
+pub struct ClientContext {
+ _ops: *const Ops,
+ rpc: rpc::ClientProxy<ServerMessage, ClientMessage>,
+ core: core::CoreThread,
+ cpu_pool: CpuPool,
+ backend_id: CString,
+ device_collection_rpc: bool,
+ input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+ output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+}
+
+impl ClientContext {
+ #[doc(hidden)]
+ pub fn handle(&self) -> current_thread::Handle {
+ self.core.handle()
+ }
+
+ #[doc(hidden)]
+ pub fn rpc(&self) -> rpc::ClientProxy<ServerMessage, ClientMessage> {
+ self.rpc.clone()
+ }
+
+ #[doc(hidden)]
+ pub fn cpu_pool(&self) -> CpuPool {
+ self.cpu_pool.clone()
+ }
+}
+
+#[cfg(target_os = "linux")]
+fn promote_thread(rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>) {
+ match get_current_thread_info() {
+ Ok(info) => {
+ let bytes = info.serialize();
+ // Don't wait for the response, this is on the callback thread, which must not block.
+ rpc.call(ServerMessage::PromoteThreadToRealTime(bytes));
+ }
+ Err(_) => {
+ warn!("Could not remotely promote thread to RT.");
+ }
+ }
+}
+
+#[cfg(not(target_os = "linux"))]
+fn promote_thread(_rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>) {
+ match promote_current_thread_to_real_time(0, 48000) {
+ Ok(_) => {
+ info!("Audio thread promoted to real-time.");
+ }
+ Err(_) => {
+ warn!("Could not promote thread to real-time.");
+ }
+ }
+}
+
+fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
+ if let Some(func) = callback {
+ let thr = thread::current();
+ let name = CString::new(thr.name().unwrap()).unwrap();
+ func(name.as_ptr());
+ }
+}
+
+fn unregister_thread(callback: Option<extern "C" fn()>) {
+ if let Some(func) = callback {
+ func();
+ }
+}
+
+fn promote_and_register_thread(
+ rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>,
+ callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+) {
+ promote_thread(rpc);
+ register_thread(callback);
+}
+
+#[derive(Default)]
+struct DeviceCollectionCallback {
+ cb: ffi::cubeb_device_collection_changed_callback,
+ user_ptr: usize,
+}
+
+struct DeviceCollectionServer {
+ input_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+ output_device_callback: Arc<Mutex<DeviceCollectionCallback>>,
+ cpu_pool: CpuPool,
+}
+
+impl rpc::Server for DeviceCollectionServer {
+ type Request = DeviceCollectionReq;
+ type Response = DeviceCollectionResp;
+ type Future = CpuFuture<Self::Response, ()>;
+ type Transport =
+ Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+ fn process(&mut self, req: Self::Request) -> Self::Future {
+ match req {
+ DeviceCollectionReq::DeviceChange(device_type) => {
+ trace!(
+ "ctx_thread: DeviceChange Callback: device_type={}",
+ device_type
+ );
+
+ let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type);
+
+ let (input_cb, input_user_ptr) = {
+ let dcb = self.input_device_callback.lock().unwrap();
+ (dcb.cb, dcb.user_ptr)
+ };
+ let (output_cb, output_user_ptr) = {
+ let dcb = self.output_device_callback.lock().unwrap();
+ (dcb.cb, dcb.user_ptr)
+ };
+
+ self.cpu_pool.spawn_fn(move || {
+ run_in_callback(|| {
+ if devtype.contains(cubeb_backend::DeviceType::INPUT) {
+ unsafe {
+ input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void)
+ }
+ }
+ if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
+ unsafe {
+ output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void)
+ }
+ }
+ });
+
+ Ok(DeviceCollectionResp::DeviceChange)
+ })
+ }
+ }
+ }
+}
+
+impl ContextOps for ClientContext {
+ fn init(_context_name: Option<&CStr>) -> Result<Context> {
+ fn bind_and_send_client(
+ stream: audioipc::AsyncMessageStream,
+ tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
+ ) -> io::Result<()> {
+ let transport = framed_with_platformhandles(stream, Default::default());
+ let rpc = rpc::bind_client::<CubebClient>(transport);
+ // If send fails then the rx end has closed
+ // which is unlikely here.
+ let _ = tx_rpc.send(rpc);
+ Ok(())
+ }
+
+ assert_not_in_callback();
+
+ let (tx_rpc, rx_rpc) = mpsc::channel();
+
+ let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap());
+ let thread_create_callback = params.thread_create_callback;
+ let thread_destroy_callback = params.thread_destroy_callback;
+
+ let server_stream =
+ unsafe { audioipc::MessageStream::from_raw_fd(params.server_connection) };
+
+ let core = core::spawn_thread(
+ "AudioIPC Client RPC",
+ move || {
+ let handle = reactor::Handle::default();
+
+ register_thread(thread_create_callback);
+
+ server_stream
+ .into_tokio_ipc(&handle)
+ .and_then(|stream| bind_and_send_client(stream, &tx_rpc))
+ },
+ move || unregister_thread(thread_destroy_callback),
+ )
+ .map_err(|_| Error::default())?;
+
+ let rpc = rx_rpc.recv().map_err(|_| Error::default())?;
+ let rpc2 = rpc.clone();
+
+ // Don't let errors bubble from here. Later calls against this context
+ // will return errors the caller expects to handle.
+ let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);
+
+ let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId())
+ .unwrap_or_else(|_| "(remote error)".to_string());
+ let backend_id = CString::new(backend_id).expect("backend_id query failed");
+
+ let cpu_pool = futures_cpupool::Builder::new()
+ .name_prefix("AudioIPC")
+ .after_start(move || promote_and_register_thread(&rpc2, thread_create_callback))
+ .before_stop(move || unregister_thread(thread_destroy_callback))
+ .pool_size(params.pool_size)
+ .stack_size(params.stack_size)
+ .create();
+
+ let ctx = Box::new(ClientContext {
+ _ops: &CLIENT_OPS as *const _,
+ rpc,
+ core,
+ cpu_pool,
+ backend_id,
+ device_collection_rpc: false,
+ input_device_callback: Arc::new(Mutex::new(Default::default())),
+ output_device_callback: Arc::new(Mutex::new(Default::default())),
+ });
+ Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
+ }
+
+ fn backend_id(&mut self) -> &CStr {
+ assert_not_in_callback();
+ self.backend_id.as_c_str()
+ }
+
+ fn max_channel_count(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount())
+ }
+
+ fn min_latency(&mut self, params: StreamParams) -> Result<u32> {
+ assert_not_in_callback();
+ let params = messages::StreamParams::from(params.as_ref());
+ send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency())
+ }
+
+ fn preferred_sample_rate(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate())
+ }
+
+ fn enumerate_devices(
+ &mut self,
+ devtype: DeviceType,
+ collection: &DeviceCollectionRef,
+ ) -> Result<()> {
+ assert_not_in_callback();
+ let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(),
+ ContextGetDeviceEnumeration(devtype.bits()) =>
+ ContextEnumeratedDevices())
+ {
+ Ok(mut v) => v.drain(..).map(|i| i.into()).collect(),
+ Err(e) => return Err(e),
+ };
+ let mut vs = v.into_boxed_slice();
+ let coll = unsafe { &mut *collection.as_ptr() };
+ coll.device = vs.as_mut_ptr();
+ coll.count = vs.len();
+ // Giving away the memory owned by vs. Don't free it!
+ // Reclaimed in `device_collection_destroy`.
+ mem::forget(vs);
+ Ok(())
+ }
+
+ fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> {
+ assert_not_in_callback();
+ unsafe {
+ let coll = &mut *collection.as_ptr();
+ let mut devices = Vec::from_raw_parts(
+ coll.device as *mut ffi::cubeb_device_info,
+ coll.count,
+ coll.count,
+ );
+ for dev in &mut devices {
+ if !dev.device_id.is_null() {
+ let _ = CString::from_raw(dev.device_id as *mut _);
+ }
+ if !dev.group_id.is_null() {
+ let _ = CString::from_raw(dev.group_id as *mut _);
+ }
+ if !dev.vendor_name.is_null() {
+ let _ = CString::from_raw(dev.vendor_name as *mut _);
+ }
+ if !dev.friendly_name.is_null() {
+ let _ = CString::from_raw(dev.friendly_name as *mut _);
+ }
+ }
+ coll.device = ptr::null_mut();
+ coll.count = 0;
+ Ok(())
+ }
+ }
+
+ fn stream_init(
+ &mut self,
+ stream_name: Option<&CStr>,
+ input_device: DeviceId,
+ input_stream_params: Option<&StreamParamsRef>,
+ output_device: DeviceId,
+ output_stream_params: Option<&StreamParamsRef>,
+ latency_frames: u32,
+ // These params aren't sent to the server
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<Stream> {
+ assert_not_in_callback();
+
+ fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option<messages::StreamParams> {
+ match p {
+ Some(p) => Some(messages::StreamParams::from(p)),
+ None => None,
+ }
+ }
+
+ let stream_name = match stream_name {
+ Some(s) => Some(s.to_bytes_with_nul().to_vec()),
+ None => None,
+ };
+
+ let input_stream_params = opt_stream_params(input_stream_params);
+ let output_stream_params = opt_stream_params(output_stream_params);
+
+ let init_params = messages::StreamInitParams {
+ stream_name,
+ input_device: input_device as usize,
+ input_stream_params,
+ output_device: output_device as usize,
+ output_stream_params,
+ latency_frames,
+ };
+ stream::init(self, init_params, data_callback, state_callback, user_ptr)
+ }
+
+ fn register_device_collection_changed(
+ &mut self,
+ devtype: DeviceType,
+ collection_changed_callback: ffi::cubeb_device_collection_changed_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<()> {
+ assert_not_in_callback();
+
+ if !self.device_collection_rpc {
+ let fds = send_recv!(self.rpc(),
+ ContextSetupDeviceCollectionCallback =>
+ ContextSetupDeviceCollectionCallback())?;
+
+ let stream =
+ unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].into_raw()) };
+
+ // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
+ // need one here. Drop the dummy handles the other side sent us to discard.
+ unsafe {
+ fds.platform_handles[1].into_file();
+ fds.platform_handles[2].into_file();
+ }
+
+ let server = DeviceCollectionServer {
+ input_device_callback: self.input_device_callback.clone(),
+ output_device_callback: self.output_device_callback.clone(),
+ cpu_pool: self.cpu_pool(),
+ };
+
+ let (wait_tx, wait_rx) = mpsc::channel();
+ self.handle()
+ .spawn(futures::future::lazy(move || {
+ let handle = reactor::Handle::default();
+ let stream = stream.into_tokio_ipc(&handle).unwrap();
+ let transport = framed(stream, Default::default());
+ rpc::bind_server(transport, server);
+ wait_tx.send(()).unwrap();
+ Ok(())
+ }))
+ .expect("Failed to spawn DeviceCollectionServer");
+ wait_rx.recv().unwrap();
+ self.device_collection_rpc = true;
+ }
+
+ if devtype.contains(cubeb_backend::DeviceType::INPUT) {
+ let mut cb = self.input_device_callback.lock().unwrap();
+ cb.cb = collection_changed_callback;
+ cb.user_ptr = user_ptr as usize;
+ }
+ if devtype.contains(cubeb_backend::DeviceType::OUTPUT) {
+ let mut cb = self.output_device_callback.lock().unwrap();
+ cb.cb = collection_changed_callback;
+ cb.user_ptr = user_ptr as usize;
+ }
+
+ let enable = collection_changed_callback.is_some();
+ send_recv!(self.rpc(),
+ ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) =>
+ ContextRegisteredDeviceCollectionChanged)
+ }
+}
+
+impl Drop for ClientContext {
+ fn drop(&mut self) {
+ debug!("ClientContext dropped...");
+ let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected);
+ }
+}
+
+impl fmt::Debug for ClientContext {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ClientContext")
+ .field("_ops", &self._ops)
+ .field("rpc", &self.rpc)
+ .field("core", &self.core)
+ .field("cpu_pool", &"...")
+ .finish()
+ }
+}
diff --git a/media/audioipc/client/src/lib.rs b/media/audioipc/client/src/lib.rs
new file mode 100644
index 0000000000..b9363ea2df
--- /dev/null
+++ b/media/audioipc/client/src/lib.rs
@@ -0,0 +1,84 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+#![warn(unused_extern_crates)]
+
+#[macro_use]
+extern crate cubeb_backend;
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+mod send_recv;
+mod context;
+mod stream;
+
+use crate::context::ClientContext;
+use crate::stream::ClientStream;
+use audioipc::PlatformHandleType;
+use cubeb_backend::{capi, ffi};
+use std::os::raw::{c_char, c_int};
+
+thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
+thread_local!(static AUDIOIPC_INIT_PARAMS: std::cell::RefCell<Option<AudioIpcInitParams>> = std::cell::RefCell::new(None));
+
+// This must match the definition of AudioIpcInitParams in
+// dom/media/CubebUtils.cpp in Gecko.
+#[repr(C)]
+#[derive(Clone, Copy, Debug)]
+pub struct AudioIpcInitParams {
+ // Fields only need to be public for ipctest.
+ pub server_connection: PlatformHandleType,
+ pub pool_size: usize,
+ pub stack_size: usize,
+ pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>,
+ pub thread_destroy_callback: Option<extern "C" fn()>,
+}
+
+unsafe impl Send for AudioIpcInitParams {}
+
+fn set_in_callback(in_callback: bool) {
+ IN_CALLBACK.with(|b| {
+ assert_eq!(*b.borrow(), !in_callback);
+ *b.borrow_mut() = in_callback;
+ });
+}
+
+fn run_in_callback<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ set_in_callback(true);
+
+ let r = f();
+
+ set_in_callback(false);
+
+ r
+}
+
+fn assert_not_in_callback() {
+ IN_CALLBACK.with(|b| {
+ assert_eq!(*b.borrow(), false);
+ });
+}
+
+#[no_mangle]
+/// Entry point from C code.
+pub unsafe extern "C" fn audioipc_client_init(
+ c: *mut *mut ffi::cubeb,
+ context_name: *const c_char,
+ init_params: *const AudioIpcInitParams,
+) -> c_int {
+ if init_params.is_null() {
+ return cubeb_backend::ffi::CUBEB_ERROR;
+ }
+
+ let init_params = &*init_params;
+
+ AUDIOIPC_INIT_PARAMS.with(|p| {
+ *p.borrow_mut() = Some(*init_params);
+ });
+ capi::capi_init::<ClientContext>(c, context_name)
+}
diff --git a/media/audioipc/client/src/send_recv.rs b/media/audioipc/client/src/send_recv.rs
new file mode 100644
index 0000000000..482b4fa6e3
--- /dev/null
+++ b/media/audioipc/client/src/send_recv.rs
@@ -0,0 +1,73 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+use cubeb_backend::Error;
+use std::os::raw::c_int;
+
+#[doc(hidden)]
+pub fn _err<E>(e: E) -> Error
+where
+ E: Into<Option<c_int>>,
+{
+ match e.into() {
+ Some(e) => unsafe { Error::from_raw(e) },
+ None => Error::error(),
+ }
+}
+
+#[macro_export]
+macro_rules! send_recv {
+ ($rpc:expr, $smsg:ident => $rmsg:ident) => {{
+ let resp = send_recv!(__send $rpc, $smsg);
+ send_recv!(__recv resp, $rmsg)
+ }};
+ ($rpc:expr, $smsg:ident => $rmsg:ident()) => {{
+ let resp = send_recv!(__send $rpc, $smsg);
+ send_recv!(__recv resp, $rmsg __result)
+ }};
+ ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{
+ let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+ send_recv!(__recv resp, $rmsg)
+ }};
+ ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{
+ let resp = send_recv!(__send $rpc, $smsg, $($a),*);
+ send_recv!(__recv resp, $rmsg __result)
+ }};
+ //
+ (__send $rpc:expr, $smsg:ident) => ({
+ $rpc.call(ServerMessage::$smsg)
+ });
+ (__send $rpc:expr, $smsg:ident, $($a:expr),*) => ({
+ $rpc.call(ServerMessage::$smsg($($a),*))
+ });
+ (__recv $resp:expr, $rmsg:ident) => ({
+ match $resp.wait() {
+ Ok(ClientMessage::$rmsg) => Ok(()),
+ Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
+ Ok(m) => {
+ debug!("received wrong message - got={:?}", m);
+ Err($crate::send_recv::_err(None))
+ },
+ Err(e) => {
+ debug!("received error from rpc - got={:?}", e);
+ Err($crate::send_recv::_err(None))
+ },
+ }
+ });
+ (__recv $resp:expr, $rmsg:ident __result) => ({
+ match $resp.wait() {
+ Ok(ClientMessage::$rmsg(v)) => Ok(v),
+ Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)),
+ Ok(m) => {
+ debug!("received wrong message - got={:?}", m);
+ Err($crate::send_recv::_err(None))
+ },
+ Err(e) => {
+ debug!("received error - got={:?}", e);
+ Err($crate::send_recv::_err(None))
+ },
+ }
+ })
+}
diff --git a/media/audioipc/client/src/stream.rs b/media/audioipc/client/src/stream.rs
new file mode 100644
index 0000000000..00a335c720
--- /dev/null
+++ b/media/audioipc/client/src/stream.rs
@@ -0,0 +1,357 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::ClientContext;
+use crate::{assert_not_in_callback, run_in_callback};
+use audioipc::frame::{framed, Framed};
+use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
+use audioipc::rpc;
+use audioipc::shm::{SharedMemMutSlice, SharedMemSlice};
+use audioipc::{codec::LengthDelimitedCodec, messages::StreamCreateParams};
+use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
+use futures::Future;
+use futures_cpupool::{CpuFuture, CpuPool};
+use std::ffi::{CStr, CString};
+use std::os::raw::c_void;
+use std::ptr;
+use std::sync::mpsc;
+use std::sync::{Arc, Mutex};
+use tokio::reactor;
+
+pub struct Device(ffi::cubeb_device);
+
+impl Drop for Device {
+ fn drop(&mut self) {
+ unsafe {
+ if !self.0.input_name.is_null() {
+ let _ = CString::from_raw(self.0.input_name as *mut _);
+ }
+ if !self.0.output_name.is_null() {
+ let _ = CString::from_raw(self.0.output_name as *mut _);
+ }
+ }
+ }
+}
+
+// ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the
+// common fields.
+#[repr(C)]
+#[derive(Debug)]
+pub struct ClientStream<'ctx> {
+ // This must be a reference to Context for cubeb, cubeb accesses
+ // stream methods via stream->context->ops
+ context: &'ctx ClientContext,
+ user_ptr: *mut c_void,
+ token: usize,
+ device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
+}
+
+struct CallbackServer {
+ input_shm: Option<SharedMemSlice>,
+ output_shm: Option<SharedMemMutSlice>,
+ data_cb: ffi::cubeb_data_callback,
+ state_cb: ffi::cubeb_state_callback,
+ user_ptr: usize,
+ cpu_pool: CpuPool,
+ device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>,
+}
+
+impl rpc::Server for CallbackServer {
+ type Request = CallbackReq;
+ type Response = CallbackResp;
+ type Future = CpuFuture<Self::Response, ()>;
+ type Transport =
+ Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>;
+
+ fn process(&mut self, req: Self::Request) -> Self::Future {
+ match req {
+ CallbackReq::Data {
+ nframes,
+ input_frame_size,
+ output_frame_size,
+ } => {
+ trace!(
+ "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}",
+ nframes,
+ input_frame_size,
+ output_frame_size,
+ );
+
+ // Clone values that need to be moved into the cpu pool thread.
+ let input_shm = match self.input_shm {
+ Some(ref shm) => unsafe { Some(shm.unsafe_clone()) },
+ None => None,
+ };
+ let mut output_shm = match self.output_shm {
+ Some(ref shm) => unsafe { Some(shm.unsafe_clone()) },
+ None => None,
+ };
+ let user_ptr = self.user_ptr;
+ let cb = self.data_cb.unwrap();
+
+ self.cpu_pool.spawn_fn(move || {
+ // TODO: This is proof-of-concept. Make it better.
+ let input_ptr: *const u8 = match input_shm {
+ Some(shm) => shm
+ .get_slice(nframes as usize * input_frame_size)
+ .unwrap()
+ .as_ptr(),
+ None => ptr::null(),
+ };
+ let output_ptr: *mut u8 = match output_shm {
+ Some(ref mut shm) => shm
+ .get_mut_slice(nframes as usize * output_frame_size)
+ .unwrap()
+ .as_mut_ptr(),
+ None => ptr::null_mut(),
+ };
+
+ run_in_callback(|| {
+ let nframes = unsafe {
+ cb(
+ ptr::null_mut(),
+ user_ptr as *mut c_void,
+ input_ptr as *const _,
+ output_ptr as *mut _,
+ nframes as _,
+ )
+ };
+
+ Ok(CallbackResp::Data(nframes as isize))
+ })
+ })
+ }
+ CallbackReq::State(state) => {
+ trace!("stream_thread: State Callback: {:?}", state);
+ let user_ptr = self.user_ptr;
+ let cb = self.state_cb.unwrap();
+ self.cpu_pool.spawn_fn(move || {
+ run_in_callback(|| unsafe {
+ cb(ptr::null_mut(), user_ptr as *mut _, state);
+ });
+
+ Ok(CallbackResp::State)
+ })
+ }
+ CallbackReq::DeviceChange => {
+ let cb = self.device_change_cb.clone();
+ let user_ptr = self.user_ptr;
+ self.cpu_pool.spawn_fn(move || {
+ run_in_callback(|| {
+ let cb = cb.lock().unwrap();
+ if let Some(cb) = *cb {
+ unsafe {
+ cb(user_ptr as *mut _);
+ }
+ } else {
+ warn!("DeviceChange received with null callback");
+ }
+ });
+
+ Ok(CallbackResp::DeviceChange)
+ })
+ }
+ }
+ }
+}
+
+impl<'ctx> ClientStream<'ctx> {
+ fn init(
+ ctx: &'ctx ClientContext,
+ init_params: messages::StreamInitParams,
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+ ) -> Result<Stream> {
+ assert_not_in_callback();
+
+ let rpc = ctx.rpc();
+ let create_params = StreamCreateParams {
+ input_stream_params: init_params.input_stream_params,
+ output_stream_params: init_params.output_stream_params,
+ };
+ let data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;
+
+ debug!(
+ "token = {}, handles = {:?}",
+ data.token, data.platform_handles
+ );
+
+ let has_input = init_params.input_stream_params.is_some();
+ let has_output = init_params.output_stream_params.is_some();
+
+ let stream =
+ unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };
+
+ let input_file = unsafe { data.platform_handles[1].into_file() };
+ let input_shm = if has_input {
+ match SharedMemSlice::from(&input_file, audioipc::SHM_AREA_SIZE) {
+ Ok(shm) => Some(shm),
+ Err(e) => {
+ debug!("Client failed to set up input shmem: {}", e);
+ return Err(Error::error());
+ }
+ }
+ } else {
+ None
+ };
+
+ let output_file = unsafe { data.platform_handles[2].into_file() };
+ let output_shm = if has_output {
+ match SharedMemMutSlice::from(&output_file, audioipc::SHM_AREA_SIZE) {
+ Ok(shm) => Some(shm),
+ Err(e) => {
+ debug!("Client failed to set up output shmem: {}", e);
+ return Err(Error::error());
+ }
+ }
+ } else {
+ None
+ };
+
+ let user_data = user_ptr as usize;
+
+ let cpu_pool = ctx.cpu_pool();
+
+ let null_cb: ffi::cubeb_device_changed_callback = None;
+ let device_change_cb = Arc::new(Mutex::new(null_cb));
+
+ let server = CallbackServer {
+ input_shm,
+ output_shm,
+ data_cb: data_callback,
+ state_cb: state_callback,
+ user_ptr: user_data,
+ cpu_pool,
+ device_change_cb: device_change_cb.clone(),
+ };
+
+ let (wait_tx, wait_rx) = mpsc::channel();
+ ctx.handle()
+ .spawn(futures::future::lazy(move || {
+ let handle = reactor::Handle::default();
+ let stream = stream.into_tokio_ipc(&handle).unwrap();
+ let transport = framed(stream, Default::default());
+ rpc::bind_server(transport, server);
+ wait_tx.send(()).unwrap();
+ Ok(())
+ }))
+ .expect("Failed to spawn CallbackServer");
+ wait_rx.recv().unwrap();
+
+ send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized)?;
+
+ let stream = Box::into_raw(Box::new(ClientStream {
+ context: ctx,
+ user_ptr,
+ token: data.token,
+ device_change_cb,
+ }));
+ Ok(unsafe { Stream::from_ptr(stream as *mut _) })
+ }
+}
+
+impl<'ctx> Drop for ClientStream<'ctx> {
+ fn drop(&mut self) {
+ debug!("ClientStream dropped...");
+ let rpc = self.context.rpc();
+ let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed);
+ }
+}
+
+impl<'ctx> StreamOps for ClientStream<'ctx> {
+ fn start(&mut self) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamStart(self.token) => StreamStarted)
+ }
+
+ fn stop(&mut self) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamStop(self.token) => StreamStopped)
+ }
+
+ fn reset_default_device(&mut self) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset)
+ }
+
+ fn position(&mut self) -> Result<u64> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition())
+ }
+
+ fn latency(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency())
+ }
+
+ fn input_latency(&mut self) -> Result<u32> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamGetInputLatency(self.token) => StreamInputLatency())
+ }
+
+ fn set_volume(&mut self, volume: f32) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet)
+ }
+
+ fn set_name(&mut self, name: &CStr) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ send_recv!(rpc, StreamSetName(self.token, name.to_owned()) => StreamNameSet)
+ }
+
+ fn current_device(&mut self) -> Result<&DeviceRef> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) {
+ Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }),
+ Err(e) => Err(e),
+ }
+ }
+
+ fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> {
+ assert_not_in_callback();
+ // It's all unsafe...
+ if device.as_ptr().is_null() {
+ Err(Error::error())
+ } else {
+ unsafe {
+ let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _);
+ }
+ Ok(())
+ }
+ }
+
+ fn register_device_changed_callback(
+ &mut self,
+ device_changed_callback: ffi::cubeb_device_changed_callback,
+ ) -> Result<()> {
+ assert_not_in_callback();
+ let rpc = self.context.rpc();
+ let enable = device_changed_callback.is_some();
+ *self.device_change_cb.lock().unwrap() = device_changed_callback;
+ send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback)
+ }
+}
+
+pub fn init(
+ ctx: &ClientContext,
+ init_params: messages::StreamInitParams,
+ data_callback: ffi::cubeb_data_callback,
+ state_callback: ffi::cubeb_state_callback,
+ user_ptr: *mut c_void,
+) -> Result<Stream> {
+ let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?;
+ debug_assert_eq!(stm.user_ptr(), user_ptr);
+ Ok(stm)
+}