diff options
Diffstat (limited to '')
-rw-r--r-- | media/audioipc/client/src/context.rs | 451 | ||||
-rw-r--r-- | media/audioipc/client/src/lib.rs | 84 | ||||
-rw-r--r-- | media/audioipc/client/src/send_recv.rs | 73 | ||||
-rw-r--r-- | media/audioipc/client/src/stream.rs | 357 |
4 files changed, 965 insertions, 0 deletions
diff --git a/media/audioipc/client/src/context.rs b/media/audioipc/client/src/context.rs new file mode 100644 index 0000000000..7441d0c0e0 --- /dev/null +++ b/media/audioipc/client/src/context.rs @@ -0,0 +1,451 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::stream; +use crate::{assert_not_in_callback, run_in_callback}; +use crate::{ClientStream, AUDIOIPC_INIT_PARAMS}; +#[cfg(target_os = "linux")] +use audio_thread_priority::get_current_thread_info; +#[cfg(not(target_os = "linux"))] +use audio_thread_priority::promote_current_thread_to_real_time; +use audioipc::codec::LengthDelimitedCodec; +use audioipc::frame::{framed, Framed}; +use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles}; +use audioipc::{core, rpc}; +use audioipc::{ + messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, + ServerMessage, +}; +use cubeb_backend::{ + ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result, + Stream, StreamParams, StreamParamsRef, +}; +use futures::Future; +use futures_cpupool::{CpuFuture, CpuPool}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::{fmt, io, mem, ptr}; +use tokio::reactor; +use tokio::runtime::current_thread; + +struct CubebClient; + +impl rpc::Client for CubebClient { + type Request = ServerMessage; + type Response = ClientMessage; + type Transport = FramedWithPlatformHandles< + audioipc::AsyncMessageStream, + LengthDelimitedCodec<Self::Request, Self::Response>, + >; +} + +pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); + +// ClientContext's layout *must* match cubeb.c's `struct cubeb` for the +// common fields. +#[repr(C)] +pub struct ClientContext { + _ops: *const Ops, + rpc: rpc::ClientProxy<ServerMessage, ClientMessage>, + core: core::CoreThread, + cpu_pool: CpuPool, + backend_id: CString, + device_collection_rpc: bool, + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, +} + +impl ClientContext { + #[doc(hidden)] + pub fn handle(&self) -> current_thread::Handle { + self.core.handle() + } + + #[doc(hidden)] + pub fn rpc(&self) -> rpc::ClientProxy<ServerMessage, ClientMessage> { + self.rpc.clone() + } + + #[doc(hidden)] + pub fn cpu_pool(&self) -> CpuPool { + self.cpu_pool.clone() + } +} + +#[cfg(target_os = "linux")] +fn promote_thread(rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>) { + match get_current_thread_info() { + Ok(info) => { + let bytes = info.serialize(); + // Don't wait for the response, this is on the callback thread, which must not block. + rpc.call(ServerMessage::PromoteThreadToRealTime(bytes)); + } + Err(_) => { + warn!("Could not remotely promote thread to RT."); + } + } +} + +#[cfg(not(target_os = "linux"))] +fn promote_thread(_rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>) { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => { + info!("Audio thread promoted to real-time."); + } + Err(_) => { + warn!("Could not promote thread to real-time."); + } + } +} + +fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) { + if let Some(func) = callback { + let thr = thread::current(); + let name = CString::new(thr.name().unwrap()).unwrap(); + func(name.as_ptr()); + } +} + +fn unregister_thread(callback: Option<extern "C" fn()>) { + if let Some(func) = callback { + func(); + } +} + +fn promote_and_register_thread( + rpc: &rpc::ClientProxy<ServerMessage, ClientMessage>, + callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, +) { + promote_thread(rpc); + register_thread(callback); +} + +#[derive(Default)] +struct DeviceCollectionCallback { + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: usize, +} + +struct DeviceCollectionServer { + input_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + output_device_callback: Arc<Mutex<DeviceCollectionCallback>>, + cpu_pool: CpuPool, +} + +impl rpc::Server for DeviceCollectionServer { + type Request = DeviceCollectionReq; + type Response = DeviceCollectionResp; + type Future = CpuFuture<Self::Response, ()>; + type Transport = + Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>; + + fn process(&mut self, req: Self::Request) -> Self::Future { + match req { + DeviceCollectionReq::DeviceChange(device_type) => { + trace!( + "ctx_thread: DeviceChange Callback: device_type={}", + device_type + ); + + let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); + + let (input_cb, input_user_ptr) = { + let dcb = self.input_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + let (output_cb, output_user_ptr) = { + let dcb = self.output_device_callback.lock().unwrap(); + (dcb.cb, dcb.user_ptr) + }; + + self.cpu_pool.spawn_fn(move || { + run_in_callback(|| { + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + unsafe { + input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) + } + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + unsafe { + output_cb.unwrap()(ptr::null_mut(), output_user_ptr as *mut c_void) + } + } + }); + + Ok(DeviceCollectionResp::DeviceChange) + }) + } + } + } +} + +impl ContextOps for ClientContext { + fn init(_context_name: Option<&CStr>) -> Result<Context> { + fn bind_and_send_client( + stream: audioipc::AsyncMessageStream, + tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>, + ) -> io::Result<()> { + let transport = framed_with_platformhandles(stream, Default::default()); + let rpc = rpc::bind_client::<CubebClient>(transport); + // If send fails then the rx end has closed + // which is unlikely here. + let _ = tx_rpc.send(rpc); + Ok(()) + } + + assert_not_in_callback(); + + let (tx_rpc, rx_rpc) = mpsc::channel(); + + let params = AUDIOIPC_INIT_PARAMS.with(|p| p.replace(None).unwrap()); + let thread_create_callback = params.thread_create_callback; + let thread_destroy_callback = params.thread_destroy_callback; + + let server_stream = + unsafe { audioipc::MessageStream::from_raw_fd(params.server_connection) }; + + let core = core::spawn_thread( + "AudioIPC Client RPC", + move || { + let handle = reactor::Handle::default(); + + register_thread(thread_create_callback); + + server_stream + .into_tokio_ipc(&handle) + .and_then(|stream| bind_and_send_client(stream, &tx_rpc)) + }, + move || unregister_thread(thread_destroy_callback), + ) + .map_err(|_| Error::default())?; + + let rpc = rx_rpc.recv().map_err(|_| Error::default())?; + let rpc2 = rpc.clone(); + + // Don't let errors bubble from here. Later calls against this context + // will return errors the caller expects to handle. + let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected); + + let backend_id = send_recv!(rpc, ContextGetBackendId => ContextBackendId()) + .unwrap_or_else(|_| "(remote error)".to_string()); + let backend_id = CString::new(backend_id).expect("backend_id query failed"); + + let cpu_pool = futures_cpupool::Builder::new() + .name_prefix("AudioIPC") + .after_start(move || promote_and_register_thread(&rpc2, thread_create_callback)) + .before_stop(move || unregister_thread(thread_destroy_callback)) + .pool_size(params.pool_size) + .stack_size(params.stack_size) + .create(); + + let ctx = Box::new(ClientContext { + _ops: &CLIENT_OPS as *const _, + rpc, + core, + cpu_pool, + backend_id, + device_collection_rpc: false, + input_device_callback: Arc::new(Mutex::new(Default::default())), + output_device_callback: Arc::new(Mutex::new(Default::default())), + }); + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) + } + + fn backend_id(&mut self) -> &CStr { + assert_not_in_callback(); + self.backend_id.as_c_str() + } + + fn max_channel_count(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc(), ContextGetMaxChannelCount => ContextMaxChannelCount()) + } + + fn min_latency(&mut self, params: StreamParams) -> Result<u32> { + assert_not_in_callback(); + let params = messages::StreamParams::from(params.as_ref()); + send_recv!(self.rpc(), ContextGetMinLatency(params) => ContextMinLatency()) + } + + fn preferred_sample_rate(&mut self) -> Result<u32> { + assert_not_in_callback(); + send_recv!(self.rpc(), ContextGetPreferredSampleRate => ContextPreferredSampleRate()) + } + + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { + assert_not_in_callback(); + let v: Vec<ffi::cubeb_device_info> = match send_recv!(self.rpc(), + ContextGetDeviceEnumeration(devtype.bits()) => + ContextEnumeratedDevices()) + { + Ok(mut v) => v.drain(..).map(|i| i.into()).collect(), + Err(e) => return Err(e), + }; + let mut vs = v.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = vs.as_mut_ptr(); + coll.count = vs.len(); + // Giving away the memory owned by vs. Don't free it! + // Reclaimed in `device_collection_destroy`. + mem::forget(vs); + Ok(()) + } + + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { + assert_not_in_callback(); + unsafe { + let coll = &mut *collection.as_ptr(); + let mut devices = Vec::from_raw_parts( + coll.device as *mut ffi::cubeb_device_info, + coll.count, + coll.count, + ); + for dev in &mut devices { + if !dev.device_id.is_null() { + let _ = CString::from_raw(dev.device_id as *mut _); + } + if !dev.group_id.is_null() { + let _ = CString::from_raw(dev.group_id as *mut _); + } + if !dev.vendor_name.is_null() { + let _ = CString::from_raw(dev.vendor_name as *mut _); + } + if !dev.friendly_name.is_null() { + let _ = CString::from_raw(dev.friendly_name as *mut _); + } + } + coll.device = ptr::null_mut(); + coll.count = 0; + Ok(()) + } + } + + fn stream_init( + &mut self, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + // These params aren't sent to the server + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + assert_not_in_callback(); + + fn opt_stream_params(p: Option<&StreamParamsRef>) -> Option<messages::StreamParams> { + match p { + Some(p) => Some(messages::StreamParams::from(p)), + None => None, + } + } + + let stream_name = match stream_name { + Some(s) => Some(s.to_bytes_with_nul().to_vec()), + None => None, + }; + + let input_stream_params = opt_stream_params(input_stream_params); + let output_stream_params = opt_stream_params(output_stream_params); + + let init_params = messages::StreamInitParams { + stream_name, + input_device: input_device as usize, + input_stream_params, + output_device: output_device as usize, + output_stream_params, + latency_frames, + }; + stream::init(self, init_params, data_callback, state_callback, user_ptr) + } + + fn register_device_collection_changed( + &mut self, + devtype: DeviceType, + collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, + ) -> Result<()> { + assert_not_in_callback(); + + if !self.device_collection_rpc { + let fds = send_recv!(self.rpc(), + ContextSetupDeviceCollectionCallback => + ContextSetupDeviceCollectionCallback())?; + + let stream = + unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].into_raw()) }; + + // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only + // need one here. Drop the dummy handles the other side sent us to discard. + unsafe { + fds.platform_handles[1].into_file(); + fds.platform_handles[2].into_file(); + } + + let server = DeviceCollectionServer { + input_device_callback: self.input_device_callback.clone(), + output_device_callback: self.output_device_callback.clone(), + cpu_pool: self.cpu_pool(), + }; + + let (wait_tx, wait_rx) = mpsc::channel(); + self.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionServer"); + wait_rx.recv().unwrap(); + self.device_collection_rpc = true; + } + + if devtype.contains(cubeb_backend::DeviceType::INPUT) { + let mut cb = self.input_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { + let mut cb = self.output_device_callback.lock().unwrap(); + cb.cb = collection_changed_callback; + cb.user_ptr = user_ptr as usize; + } + + let enable = collection_changed_callback.is_some(); + send_recv!(self.rpc(), + ContextRegisterDeviceCollectionChanged(devtype.bits(), enable) => + ContextRegisteredDeviceCollectionChanged) + } +} + +impl Drop for ClientContext { + fn drop(&mut self) { + debug!("ClientContext dropped..."); + let _ = send_recv!(self.rpc(), ClientDisconnect => ClientDisconnected); + } +} + +impl fmt::Debug for ClientContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClientContext") + .field("_ops", &self._ops) + .field("rpc", &self.rpc) + .field("core", &self.core) + .field("cpu_pool", &"...") + .finish() + } +} diff --git a/media/audioipc/client/src/lib.rs b/media/audioipc/client/src/lib.rs new file mode 100644 index 0000000000..b9363ea2df --- /dev/null +++ b/media/audioipc/client/src/lib.rs @@ -0,0 +1,84 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. +#![warn(unused_extern_crates)] + +#[macro_use] +extern crate cubeb_backend; +#[macro_use] +extern crate log; + +#[macro_use] +mod send_recv; +mod context; +mod stream; + +use crate::context::ClientContext; +use crate::stream::ClientStream; +use audioipc::PlatformHandleType; +use cubeb_backend::{capi, ffi}; +use std::os::raw::{c_char, c_int}; + +thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false)); +thread_local!(static AUDIOIPC_INIT_PARAMS: std::cell::RefCell<Option<AudioIpcInitParams>> = std::cell::RefCell::new(None)); + +// This must match the definition of AudioIpcInitParams in +// dom/media/CubebUtils.cpp in Gecko. +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct AudioIpcInitParams { + // Fields only need to be public for ipctest. + pub server_connection: PlatformHandleType, + pub pool_size: usize, + pub stack_size: usize, + pub thread_create_callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>, + pub thread_destroy_callback: Option<extern "C" fn()>, +} + +unsafe impl Send for AudioIpcInitParams {} + +fn set_in_callback(in_callback: bool) { + IN_CALLBACK.with(|b| { + assert_eq!(*b.borrow(), !in_callback); + *b.borrow_mut() = in_callback; + }); +} + +fn run_in_callback<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + set_in_callback(true); + + let r = f(); + + set_in_callback(false); + + r +} + +fn assert_not_in_callback() { + IN_CALLBACK.with(|b| { + assert_eq!(*b.borrow(), false); + }); +} + +#[no_mangle] +/// Entry point from C code. +pub unsafe extern "C" fn audioipc_client_init( + c: *mut *mut ffi::cubeb, + context_name: *const c_char, + init_params: *const AudioIpcInitParams, +) -> c_int { + if init_params.is_null() { + return cubeb_backend::ffi::CUBEB_ERROR; + } + + let init_params = &*init_params; + + AUDIOIPC_INIT_PARAMS.with(|p| { + *p.borrow_mut() = Some(*init_params); + }); + capi::capi_init::<ClientContext>(c, context_name) +} diff --git a/media/audioipc/client/src/send_recv.rs b/media/audioipc/client/src/send_recv.rs new file mode 100644 index 0000000000..482b4fa6e3 --- /dev/null +++ b/media/audioipc/client/src/send_recv.rs @@ -0,0 +1,73 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use cubeb_backend::Error; +use std::os::raw::c_int; + +#[doc(hidden)] +pub fn _err<E>(e: E) -> Error +where + E: Into<Option<c_int>>, +{ + match e.into() { + Some(e) => unsafe { Error::from_raw(e) }, + None => Error::error(), + } +} + +#[macro_export] +macro_rules! send_recv { + ($rpc:expr, $smsg:ident => $rmsg:ident) => {{ + let resp = send_recv!(__send $rpc, $smsg); + send_recv!(__recv resp, $rmsg) + }}; + ($rpc:expr, $smsg:ident => $rmsg:ident()) => {{ + let resp = send_recv!(__send $rpc, $smsg); + send_recv!(__recv resp, $rmsg __result) + }}; + ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident) => {{ + let resp = send_recv!(__send $rpc, $smsg, $($a),*); + send_recv!(__recv resp, $rmsg) + }}; + ($rpc:expr, $smsg:ident($($a:expr),*) => $rmsg:ident()) => {{ + let resp = send_recv!(__send $rpc, $smsg, $($a),*); + send_recv!(__recv resp, $rmsg __result) + }}; + // + (__send $rpc:expr, $smsg:ident) => ({ + $rpc.call(ServerMessage::$smsg) + }); + (__send $rpc:expr, $smsg:ident, $($a:expr),*) => ({ + $rpc.call(ServerMessage::$smsg($($a),*)) + }); + (__recv $resp:expr, $rmsg:ident) => ({ + match $resp.wait() { + Ok(ClientMessage::$rmsg) => Ok(()), + Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), + Ok(m) => { + debug!("received wrong message - got={:?}", m); + Err($crate::send_recv::_err(None)) + }, + Err(e) => { + debug!("received error from rpc - got={:?}", e); + Err($crate::send_recv::_err(None)) + }, + } + }); + (__recv $resp:expr, $rmsg:ident __result) => ({ + match $resp.wait() { + Ok(ClientMessage::$rmsg(v)) => Ok(v), + Ok(ClientMessage::Error(e)) => Err($crate::send_recv::_err(e)), + Ok(m) => { + debug!("received wrong message - got={:?}", m); + Err($crate::send_recv::_err(None)) + }, + Err(e) => { + debug!("received error - got={:?}", e); + Err($crate::send_recv::_err(None)) + }, + } + }) +} diff --git a/media/audioipc/client/src/stream.rs b/media/audioipc/client/src/stream.rs new file mode 100644 index 0000000000..00a335c720 --- /dev/null +++ b/media/audioipc/client/src/stream.rs @@ -0,0 +1,357 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::ClientContext; +use crate::{assert_not_in_callback, run_in_callback}; +use audioipc::frame::{framed, Framed}; +use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage}; +use audioipc::rpc; +use audioipc::shm::{SharedMemMutSlice, SharedMemSlice}; +use audioipc::{codec::LengthDelimitedCodec, messages::StreamCreateParams}; +use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps}; +use futures::Future; +use futures_cpupool::{CpuFuture, CpuPool}; +use std::ffi::{CStr, CString}; +use std::os::raw::c_void; +use std::ptr; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use tokio::reactor; + +pub struct Device(ffi::cubeb_device); + +impl Drop for Device { + fn drop(&mut self) { + unsafe { + if !self.0.input_name.is_null() { + let _ = CString::from_raw(self.0.input_name as *mut _); + } + if !self.0.output_name.is_null() { + let _ = CString::from_raw(self.0.output_name as *mut _); + } + } + } +} + +// ClientStream's layout *must* match cubeb.c's `struct cubeb_stream` for the +// common fields. +#[repr(C)] +#[derive(Debug)] +pub struct ClientStream<'ctx> { + // This must be a reference to Context for cubeb, cubeb accesses + // stream methods via stream->context->ops + context: &'ctx ClientContext, + user_ptr: *mut c_void, + token: usize, + device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>, +} + +struct CallbackServer { + input_shm: Option<SharedMemSlice>, + output_shm: Option<SharedMemMutSlice>, + data_cb: ffi::cubeb_data_callback, + state_cb: ffi::cubeb_state_callback, + user_ptr: usize, + cpu_pool: CpuPool, + device_change_cb: Arc<Mutex<ffi::cubeb_device_changed_callback>>, +} + +impl rpc::Server for CallbackServer { + type Request = CallbackReq; + type Response = CallbackResp; + type Future = CpuFuture<Self::Response, ()>; + type Transport = + Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Response, Self::Request>>; + + fn process(&mut self, req: Self::Request) -> Self::Future { + match req { + CallbackReq::Data { + nframes, + input_frame_size, + output_frame_size, + } => { + trace!( + "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}", + nframes, + input_frame_size, + output_frame_size, + ); + + // Clone values that need to be moved into the cpu pool thread. + let input_shm = match self.input_shm { + Some(ref shm) => unsafe { Some(shm.unsafe_clone()) }, + None => None, + }; + let mut output_shm = match self.output_shm { + Some(ref shm) => unsafe { Some(shm.unsafe_clone()) }, + None => None, + }; + let user_ptr = self.user_ptr; + let cb = self.data_cb.unwrap(); + + self.cpu_pool.spawn_fn(move || { + // TODO: This is proof-of-concept. Make it better. + let input_ptr: *const u8 = match input_shm { + Some(shm) => shm + .get_slice(nframes as usize * input_frame_size) + .unwrap() + .as_ptr(), + None => ptr::null(), + }; + let output_ptr: *mut u8 = match output_shm { + Some(ref mut shm) => shm + .get_mut_slice(nframes as usize * output_frame_size) + .unwrap() + .as_mut_ptr(), + None => ptr::null_mut(), + }; + + run_in_callback(|| { + let nframes = unsafe { + cb( + ptr::null_mut(), + user_ptr as *mut c_void, + input_ptr as *const _, + output_ptr as *mut _, + nframes as _, + ) + }; + + Ok(CallbackResp::Data(nframes as isize)) + }) + }) + } + CallbackReq::State(state) => { + trace!("stream_thread: State Callback: {:?}", state); + let user_ptr = self.user_ptr; + let cb = self.state_cb.unwrap(); + self.cpu_pool.spawn_fn(move || { + run_in_callback(|| unsafe { + cb(ptr::null_mut(), user_ptr as *mut _, state); + }); + + Ok(CallbackResp::State) + }) + } + CallbackReq::DeviceChange => { + let cb = self.device_change_cb.clone(); + let user_ptr = self.user_ptr; + self.cpu_pool.spawn_fn(move || { + run_in_callback(|| { + let cb = cb.lock().unwrap(); + if let Some(cb) = *cb { + unsafe { + cb(user_ptr as *mut _); + } + } else { + warn!("DeviceChange received with null callback"); + } + }); + + Ok(CallbackResp::DeviceChange) + }) + } + } + } +} + +impl<'ctx> ClientStream<'ctx> { + fn init( + ctx: &'ctx ClientContext, + init_params: messages::StreamInitParams, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + assert_not_in_callback(); + + let rpc = ctx.rpc(); + let create_params = StreamCreateParams { + input_stream_params: init_params.input_stream_params, + output_stream_params: init_params.output_stream_params, + }; + let data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?; + + debug!( + "token = {}, handles = {:?}", + data.token, data.platform_handles + ); + + let has_input = init_params.input_stream_params.is_some(); + let has_output = init_params.output_stream_params.is_some(); + + let stream = + unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) }; + + let input_file = unsafe { data.platform_handles[1].into_file() }; + let input_shm = if has_input { + match SharedMemSlice::from(&input_file, audioipc::SHM_AREA_SIZE) { + Ok(shm) => Some(shm), + Err(e) => { + debug!("Client failed to set up input shmem: {}", e); + return Err(Error::error()); + } + } + } else { + None + }; + + let output_file = unsafe { data.platform_handles[2].into_file() }; + let output_shm = if has_output { + match SharedMemMutSlice::from(&output_file, audioipc::SHM_AREA_SIZE) { + Ok(shm) => Some(shm), + Err(e) => { + debug!("Client failed to set up output shmem: {}", e); + return Err(Error::error()); + } + } + } else { + None + }; + + let user_data = user_ptr as usize; + + let cpu_pool = ctx.cpu_pool(); + + let null_cb: ffi::cubeb_device_changed_callback = None; + let device_change_cb = Arc::new(Mutex::new(null_cb)); + + let server = CallbackServer { + input_shm, + output_shm, + data_cb: data_callback, + state_cb: state_callback, + user_ptr: user_data, + cpu_pool, + device_change_cb: device_change_cb.clone(), + }; + + let (wait_tx, wait_rx) = mpsc::channel(); + ctx.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn CallbackServer"); + wait_rx.recv().unwrap(); + + send_recv!(rpc, StreamInit(data.token, init_params) => StreamInitialized)?; + + let stream = Box::into_raw(Box::new(ClientStream { + context: ctx, + user_ptr, + token: data.token, + device_change_cb, + })); + Ok(unsafe { Stream::from_ptr(stream as *mut _) }) + } +} + +impl<'ctx> Drop for ClientStream<'ctx> { + fn drop(&mut self) { + debug!("ClientStream dropped..."); + let rpc = self.context.rpc(); + let _ = send_recv!(rpc, StreamDestroy(self.token) => StreamDestroyed); + } +} + +impl<'ctx> StreamOps for ClientStream<'ctx> { + fn start(&mut self) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamStart(self.token) => StreamStarted) + } + + fn stop(&mut self) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamStop(self.token) => StreamStopped) + } + + fn reset_default_device(&mut self) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamResetDefaultDevice(self.token) => StreamDefaultDeviceReset) + } + + fn position(&mut self) -> Result<u64> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetPosition(self.token) => StreamPosition()) + } + + fn latency(&mut self) -> Result<u32> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetLatency(self.token) => StreamLatency()) + } + + fn input_latency(&mut self) -> Result<u32> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamGetInputLatency(self.token) => StreamInputLatency()) + } + + fn set_volume(&mut self, volume: f32) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetVolume(self.token, volume) => StreamVolumeSet) + } + + fn set_name(&mut self, name: &CStr) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + send_recv!(rpc, StreamSetName(self.token, name.to_owned()) => StreamNameSet) + } + + fn current_device(&mut self) -> Result<&DeviceRef> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + match send_recv!(rpc, StreamGetCurrentDevice(self.token) => StreamCurrentDevice()) { + Ok(d) => Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(Box::new(d.into()))) }), + Err(e) => Err(e), + } + } + + fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> { + assert_not_in_callback(); + // It's all unsafe... + if device.as_ptr().is_null() { + Err(Error::error()) + } else { + unsafe { + let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _); + } + Ok(()) + } + } + + fn register_device_changed_callback( + &mut self, + device_changed_callback: ffi::cubeb_device_changed_callback, + ) -> Result<()> { + assert_not_in_callback(); + let rpc = self.context.rpc(); + let enable = device_changed_callback.is_some(); + *self.device_change_cb.lock().unwrap() = device_changed_callback; + send_recv!(rpc, StreamRegisterDeviceChangeCallback(self.token, enable) => StreamRegisterDeviceChangeCallback) + } +} + +pub fn init( + ctx: &ClientContext, + init_params: messages::StreamInitParams, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, +) -> Result<Stream> { + let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?; + debug_assert_eq!(stm.user_ptr(), user_ptr); + Ok(stm) +} |