diff options
Diffstat (limited to 'third_party/rust/cubeb-pulse/src/backend/context.rs')
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/context.rs | 751 |
1 files changed, 751 insertions, 0 deletions
diff --git a/third_party/rust/cubeb-pulse/src/backend/context.rs b/third_party/rust/cubeb-pulse/src/backend/context.rs new file mode 100644 index 0000000000..2b37798335 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/context.rs @@ -0,0 +1,751 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use backend::*; +use cubeb_backend::{ + ffi, log_enabled, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, + Result, Stream, StreamParams, StreamParamsRef, +}; +use pulse::{self, ProplistExt}; +use pulse_ffi::*; +use semver; +use std::cell::RefCell; +use std::default::Default; +use std::ffi::{CStr, CString}; +use std::mem; +use std::os::raw::c_void; +use std::ptr; + +#[derive(Debug)] +pub struct DefaultInfo { + pub sample_spec: pulse::SampleSpec, + pub channel_map: pulse::ChannelMap, + pub flags: pulse::SinkFlags, +} + +pub const PULSE_OPS: Ops = capi_new!(PulseContext, PulseStream); + +#[repr(C)] +#[derive(Debug)] +pub struct PulseContext { + _ops: *const Ops, + pub mainloop: pulse::ThreadedMainloop, + pub context: Option<pulse::Context>, + pub default_sink_info: Option<DefaultInfo>, + pub context_name: Option<CString>, + pub input_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + pub input_collection_changed_user_ptr: *mut c_void, + pub output_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + pub output_collection_changed_user_ptr: *mut c_void, + pub error: bool, + pub version_2_0_0: bool, + pub version_0_9_8: bool, + #[cfg(feature = "pulse-dlopen")] + pub libpulse: LibLoader, + devids: RefCell<Intern>, +} + +impl PulseContext { + #[cfg(feature = "pulse-dlopen")] + fn _new(name: Option<CString>) -> Result<Box<Self>> { + let libpulse = unsafe { open() }; + if libpulse.is_none() { + cubeb_log!("libpulse not found"); + return Err(Error::error()); + } + + let ctx = Box::new(PulseContext { + _ops: &PULSE_OPS, + libpulse: libpulse.unwrap(), + mainloop: pulse::ThreadedMainloop::new(), + context: None, + default_sink_info: None, + context_name: name, + input_collection_changed_callback: None, + input_collection_changed_user_ptr: ptr::null_mut(), + output_collection_changed_callback: None, + output_collection_changed_user_ptr: ptr::null_mut(), + error: true, + version_0_9_8: false, + version_2_0_0: false, + devids: RefCell::new(Intern::new()), + }); + + Ok(ctx) + } + + #[cfg(not(feature = "pulse-dlopen"))] + fn _new(name: Option<CString>) -> Result<Box<Self>> { + Ok(Box::new(PulseContext { + _ops: &PULSE_OPS, + mainloop: pulse::ThreadedMainloop::new(), + context: None, + default_sink_info: None, + context_name: name, + input_collection_changed_callback: None, + input_collection_changed_user_ptr: ptr::null_mut(), + output_collection_changed_callback: None, + output_collection_changed_user_ptr: ptr::null_mut(), + error: true, + version_0_9_8: false, + version_2_0_0: false, + devids: RefCell::new(Intern::new()), + })) + } + + fn server_info_cb(context: &pulse::Context, info: Option<&pulse::ServerInfo>, u: *mut c_void) { + fn sink_info_cb(_: &pulse::Context, i: *const pulse::SinkInfo, eol: i32, u: *mut c_void) { + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + if eol == 0 { + let info = unsafe { &*i }; + let flags = pulse::SinkFlags::from_bits_truncate(info.flags); + ctx.default_sink_info = Some(DefaultInfo { + sample_spec: info.sample_spec, + channel_map: info.channel_map, + flags, + }); + } + ctx.mainloop.signal(); + } + + if let Some(info) = info { + let _ = context.get_sink_info_by_name( + try_cstr_from(info.default_sink_name), + sink_info_cb, + u, + ); + } else { + // If info is None, then an error occured. + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + ctx.mainloop.signal(); + } + } + + fn new(name: Option<&CStr>) -> Result<Box<Self>> { + let name = name.map(|s| s.to_owned()); + let mut ctx = PulseContext::_new(name)?; + + if ctx.mainloop.start().is_err() { + ctx.destroy(); + cubeb_log!("Error: couldn't start pulse's mainloop"); + return Err(Error::error()); + } + + if ctx.context_init().is_err() { + ctx.destroy(); + cubeb_log!("Error: couldn't init pulse's context"); + return Err(Error::error()); + } + + ctx.mainloop.lock(); + /* server_info_callback performs a second async query, + * which is responsible for initializing default_sink_info + * and signalling the mainloop to end the wait. */ + let user_data: *mut c_void = ctx.as_mut() as *mut _ as *mut _; + if let Some(ref context) = ctx.context { + if let Ok(o) = context.get_server_info(PulseContext::server_info_cb, user_data) { + ctx.operation_wait(None, &o); + } + } + ctx.mainloop.unlock(); + + /* Update `default_sink_info` when the default device changes. */ + if let Err(e) = ctx.subscribe_notifications(pulse::SubscriptionMask::SERVER) { + cubeb_log!("subscribe_notifications ignored failure: {}", e); + } + + // Return the result. + Ok(ctx) + } + + pub fn destroy(&mut self) { + self.context_destroy(); + + if !self.mainloop.is_null() { + self.mainloop.stop(); + } + } + + fn subscribe_notifications(&mut self, mask: pulse::SubscriptionMask) -> Result<()> { + fn update_collection( + _: &pulse::Context, + event: pulse::SubscriptionEvent, + index: u32, + user_data: *mut c_void, + ) { + let ctx = unsafe { &mut *(user_data as *mut PulseContext) }; + + let (f, t) = (event.event_facility(), event.event_type()); + if (f == pulse::SubscriptionEventFacility::Source) + | (f == pulse::SubscriptionEventFacility::Sink) + { + if (t == pulse::SubscriptionEventType::Remove) + | (t == pulse::SubscriptionEventType::New) + { + if log_enabled() { + let op = if t == pulse::SubscriptionEventType::New { + "Adding" + } else { + "Removing" + }; + let dev = if f == pulse::SubscriptionEventFacility::Sink { + "sink" + } else { + "source " + }; + cubeb_log!("{} {} index {}", op, dev, index); + } + + if f == pulse::SubscriptionEventFacility::Source { + unsafe { + ctx.input_collection_changed_callback.unwrap()( + ctx as *mut _ as *mut _, + ctx.input_collection_changed_user_ptr, + ); + } + } + if f == pulse::SubscriptionEventFacility::Sink { + unsafe { + ctx.output_collection_changed_callback.unwrap()( + ctx as *mut _ as *mut _, + ctx.output_collection_changed_user_ptr, + ); + } + } + } + } else if (f == pulse::SubscriptionEventFacility::Server) + && (t == pulse::SubscriptionEventType::Change) + { + cubeb_log!("Server changed {}", index as i32); + let user_data: *mut c_void = ctx as *mut _ as *mut _; + if let Some(ref context) = ctx.context { + if let Err(e) = context.get_server_info(PulseContext::server_info_cb, user_data) + { + cubeb_log!("Error: get_server_info ignored failure: {}", e); + } + } + } + } + + fn success(_: &pulse::Context, success: i32, user_data: *mut c_void) { + let ctx = unsafe { &*(user_data as *mut PulseContext) }; + if success != 1 { + cubeb_log!("subscribe_success ignored failure: {}", success); + } + ctx.mainloop.signal(); + } + + let user_data: *mut c_void = self as *const _ as *mut _; + if let Some(ref context) = self.context { + self.mainloop.lock(); + + context.set_subscribe_callback(update_collection, user_data); + + if let Ok(o) = context.subscribe(mask, success, self as *const _ as *mut _) { + self.operation_wait(None, &o); + } else { + self.mainloop.unlock(); + cubeb_log!("Error: context subscribe failed"); + return Err(Error::error()); + } + + self.mainloop.unlock(); + } + + Ok(()) + } +} + +impl ContextOps for PulseContext { + fn init(context_name: Option<&CStr>) -> Result<Context> { + let ctx = PulseContext::new(context_name)?; + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) + } + + fn backend_id(&mut self) -> &'static CStr { + unsafe { CStr::from_ptr(b"pulse-rust\0".as_ptr() as *const _) } + } + + fn max_channel_count(&mut self) -> Result<u32> { + match self.default_sink_info { + Some(ref info) => Ok(u32::from(info.channel_map.channels)), + None => { + cubeb_log!("Error: couldn't get the max channel count"); + Err(Error::error()) + } + } + } + + fn min_latency(&mut self, params: StreamParams) -> Result<u32> { + // According to PulseAudio developers, this is a safe minimum. + Ok(25 * params.rate() / 1000) + } + + fn preferred_sample_rate(&mut self) -> Result<u32> { + match self.default_sink_info { + Some(ref info) => Ok(info.sample_spec.rate), + None => { + cubeb_log!("Error: Couldn't get the preferred sample rate"); + Err(Error::error()) + } + } + } + + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { + fn add_output_device( + _: &pulse::Context, + i: *const pulse::SinkInfo, + eol: i32, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + let ctx = list_data.context; + + if eol != 0 { + ctx.mainloop.signal(); + return; + } + + debug_assert!(!i.is_null()); + debug_assert!(!user_data.is_null()); + + let info = unsafe { &*i }; + + let group_id = match info.proplist().gets("sysfs.path") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let vendor_name = match info.proplist().gets("device.vendor.name") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let info_name = unsafe { CStr::from_ptr(info.name) }; + let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned(); + + let preferred = if *info_name == *list_data.default_sink_name { + ffi::CUBEB_DEVICE_PREF_ALL + } else { + ffi::CUBEB_DEVICE_PREF_NONE + }; + + let device_id = ctx.devids.borrow_mut().add(info_name); + let friendly_name = info_description.into_raw(); + let devinfo = ffi::cubeb_device_info { + device_id, + devid: device_id as ffi::cubeb_devid, + friendly_name, + group_id, + vendor_name, + device_type: ffi::CUBEB_DEVICE_TYPE_OUTPUT, + state: ctx.state_from_port(info.active_port), + preferred, + format: ffi::CUBEB_DEVICE_FMT_ALL, + default_format: pulse_format_to_cubeb_format(info.sample_spec.format), + max_channels: u32::from(info.channel_map.channels), + min_rate: 1, + max_rate: PA_RATE_MAX, + default_rate: info.sample_spec.rate, + latency_lo: 0, + latency_hi: 0, + }; + list_data.devinfo.push(devinfo); + } + + fn add_input_device( + _: &pulse::Context, + i: *const pulse::SourceInfo, + eol: i32, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + let ctx = list_data.context; + + if eol != 0 { + ctx.mainloop.signal(); + return; + } + + debug_assert!(!user_data.is_null()); + debug_assert!(!i.is_null()); + + let info = unsafe { &*i }; + + let group_id = match info.proplist().gets("sysfs.path") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let vendor_name = match info.proplist().gets("device.vendor.name") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let info_name = unsafe { CStr::from_ptr(info.name) }; + let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned(); + + let preferred = if *info_name == *list_data.default_source_name { + ffi::CUBEB_DEVICE_PREF_ALL + } else { + ffi::CUBEB_DEVICE_PREF_NONE + }; + + let device_id = ctx.devids.borrow_mut().add(info_name); + let friendly_name = info_description.into_raw(); + let devinfo = ffi::cubeb_device_info { + device_id, + devid: device_id as ffi::cubeb_devid, + friendly_name, + group_id, + vendor_name, + device_type: ffi::CUBEB_DEVICE_TYPE_INPUT, + state: ctx.state_from_port(info.active_port), + preferred, + format: ffi::CUBEB_DEVICE_FMT_ALL, + default_format: pulse_format_to_cubeb_format(info.sample_spec.format), + max_channels: u32::from(info.channel_map.channels), + min_rate: 1, + max_rate: PA_RATE_MAX, + default_rate: info.sample_spec.rate, + latency_lo: 0, + latency_hi: 0, + }; + + list_data.devinfo.push(devinfo); + } + + fn default_device_names( + _: &pulse::Context, + info: Option<&pulse::ServerInfo>, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + + if let Some(info) = info { + list_data.default_sink_name = super::try_cstr_from(info.default_sink_name) + .map(|s| s.to_owned()) + .unwrap_or_default(); + list_data.default_source_name = super::try_cstr_from(info.default_source_name) + .map(|s| s.to_owned()) + .unwrap_or_default(); + } + + list_data.context.mainloop.signal(); + } + + let mut user_data = PulseDevListData::new(self); + + if let Some(ref context) = self.context { + self.mainloop.lock(); + + if let Ok(o) = + context.get_server_info(default_device_names, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + + if devtype.contains(DeviceType::OUTPUT) { + if let Ok(o) = context + .get_sink_info_list(add_output_device, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + } + + if devtype.contains(DeviceType::INPUT) { + if let Ok(o) = context + .get_source_info_list(add_input_device, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + } + + self.mainloop.unlock(); + } + + // Extract the array of cubeb_device_info from + // PulseDevListData and convert it into C representation. + let mut tmp = Vec::new(); + mem::swap(&mut user_data.devinfo, &mut tmp); + let mut devices = tmp.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = devices.as_mut_ptr(); + coll.count = devices.len(); + + // Giving away the memory owned by devices. Don't free it! + mem::forget(devices); + Ok(()) + } + + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { + debug_assert!(!collection.as_ptr().is_null()); + unsafe { + let coll = &mut *collection.as_ptr(); + let mut devices = Vec::from_raw_parts( + coll.device as *mut ffi::cubeb_device_info, + coll.count, + coll.count, + ); + for dev in &mut devices { + if !dev.group_id.is_null() { + let _ = CString::from_raw(dev.group_id as *mut _); + } + if !dev.vendor_name.is_null() { + let _ = CString::from_raw(dev.vendor_name as *mut _); + } + if !dev.friendly_name.is_null() { + let _ = CString::from_raw(dev.friendly_name as *mut _); + } + } + coll.device = ptr::null_mut(); + coll.count = 0; + } + Ok(()) + } + + #[cfg_attr(feature = "cargo-clippy", allow(clippy::too_many_arguments))] + fn stream_init( + &mut self, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + if self.error { + self.context_init()?; + } + + let stm = PulseStream::new( + self, + stream_name, + input_device, + input_stream_params, + output_device, + output_stream_params, + latency_frames, + data_callback, + state_callback, + user_ptr, + )?; + Ok(unsafe { Stream::from_ptr(Box::into_raw(stm) as *mut _) }) + } + + fn register_device_collection_changed( + &mut self, + devtype: DeviceType, + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, + ) -> Result<()> { + if devtype.contains(DeviceType::INPUT) { + self.input_collection_changed_callback = cb; + self.input_collection_changed_user_ptr = user_ptr; + } + if devtype.contains(DeviceType::OUTPUT) { + self.output_collection_changed_callback = cb; + self.output_collection_changed_user_ptr = user_ptr; + } + + let mut mask = pulse::SubscriptionMask::empty(); + if self.input_collection_changed_callback.is_some() { + mask |= pulse::SubscriptionMask::SOURCE; + } + if self.output_collection_changed_callback.is_some() { + mask |= pulse::SubscriptionMask::SINK; + } + /* Default device changed, this is always registered in order to update the + * `default_sink_info` when the default device changes. */ + mask |= pulse::SubscriptionMask::SERVER; + + self.subscribe_notifications(mask) + } +} + +impl Drop for PulseContext { + fn drop(&mut self) { + self.destroy(); + } +} + +impl PulseContext { + /* Initialize PulseAudio Context */ + fn context_init(&mut self) -> Result<()> { + fn error_state(c: &pulse::Context, u: *mut c_void) { + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + if !c.get_state().is_good() { + ctx.error = true; + } + ctx.mainloop.signal(); + } + + if self.context.is_some() { + debug_assert!(self.error); + self.context_destroy(); + } + + self.context = { + let name = self.context_name.as_ref().map(|s| s.as_ref()); + pulse::Context::new(&self.mainloop.get_api(), name) + }; + + let context_ptr: *mut c_void = self as *mut _ as *mut _; + if self.context.is_none() { + cubeb_log!("Error: couldn't create pulse's context"); + return Err(Error::error()); + } + + self.mainloop.lock(); + let connected = if let Some(ref context) = self.context { + context.set_state_callback(error_state, context_ptr); + context + .connect(None, pulse::ContextFlags::empty(), ptr::null()) + .is_ok() + } else { + false + }; + + if !connected || !self.wait_until_context_ready() { + self.mainloop.unlock(); + self.context_destroy(); + cubeb_log!("Error: error while waiting for pulse's context to be ready"); + return Err(Error::error()); + } + + self.mainloop.unlock(); + + let version_str = unsafe { CStr::from_ptr(pulse::library_version()) }; + if let Ok(version) = semver::Version::parse(&version_str.to_string_lossy()) { + self.version_0_9_8 = + version >= semver::Version::parse("0.9.8").expect("Failed to parse version"); + self.version_2_0_0 = + version >= semver::Version::parse("2.0.0").expect("Failed to parse version"); + } + + self.error = false; + + Ok(()) + } + + fn context_destroy(&mut self) { + fn drain_complete(_: &pulse::Context, u: *mut c_void) { + let ctx = unsafe { &*(u as *mut PulseContext) }; + ctx.mainloop.signal(); + } + + let context_ptr: *mut c_void = self as *mut _ as *mut _; + if let Some(ctx) = self.context.take() { + self.mainloop.lock(); + if let Ok(o) = ctx.drain(drain_complete, context_ptr) { + self.operation_wait(None, &o); + } + ctx.clear_state_callback(); + ctx.disconnect(); + ctx.unref(); + self.mainloop.unlock(); + } + } + + pub fn operation_wait<'a, S>(&self, s: S, o: &pulse::Operation) -> bool + where + S: Into<Option<&'a pulse::Stream>>, + { + let stream = s.into(); + while o.get_state() == PA_OPERATION_RUNNING { + self.mainloop.wait(); + if let Some(ref context) = self.context { + if !context.get_state().is_good() { + return false; + } + } + + if let Some(stm) = stream { + if !stm.get_state().is_good() { + return false; + } + } + } + + true + } + + pub fn wait_until_context_ready(&self) -> bool { + if let Some(ref context) = self.context { + loop { + let state = context.get_state(); + if !state.is_good() { + return false; + } + if state == pulse::ContextState::Ready { + break; + } + self.mainloop.wait(); + } + } + + true + } + + fn state_from_port(&self, i: *const pa_port_info) -> ffi::cubeb_device_state { + if !i.is_null() { + let info = unsafe { *i }; + if self.version_2_0_0 && info.available == PA_PORT_AVAILABLE_NO { + ffi::CUBEB_DEVICE_STATE_UNPLUGGED + } else { + ffi::CUBEB_DEVICE_STATE_ENABLED + } + } else { + ffi::CUBEB_DEVICE_STATE_ENABLED + } + } +} + +struct PulseDevListData<'a> { + default_sink_name: CString, + default_source_name: CString, + devinfo: Vec<ffi::cubeb_device_info>, + context: &'a PulseContext, +} + +impl<'a> PulseDevListData<'a> { + pub fn new<'b>(context: &'b PulseContext) -> Self + where + 'b: 'a, + { + PulseDevListData { + default_sink_name: CString::default(), + default_source_name: CString::default(), + devinfo: Vec::new(), + context, + } + } +} + +impl<'a> Drop for PulseDevListData<'a> { + fn drop(&mut self) { + for elem in &mut self.devinfo { + let _ = unsafe { Box::from_raw(elem) }; + } + } +} + +fn pulse_format_to_cubeb_format(format: pa_sample_format_t) -> ffi::cubeb_device_fmt { + match format { + PA_SAMPLE_S16LE => ffi::CUBEB_DEVICE_FMT_S16LE, + PA_SAMPLE_S16BE => ffi::CUBEB_DEVICE_FMT_S16BE, + PA_SAMPLE_FLOAT32LE => ffi::CUBEB_DEVICE_FMT_F32LE, + PA_SAMPLE_FLOAT32BE => ffi::CUBEB_DEVICE_FMT_F32BE, + // Unsupported format, return F32NE + _ => ffi::CUBEB_DEVICE_FMT_F32NE, + } +} |