diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 00:47:55 +0000 |
commit | 26a029d407be480d791972afb5975cf62c9360a6 (patch) | |
tree | f435a8308119effd964b339f76abb83a57c29483 /third_party/rust/cubeb-pulse | |
parent | Initial commit. (diff) | |
download | firefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz firefox-26a029d407be480d791972afb5975cf62c9360a6.zip |
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/cubeb-pulse')
-rw-r--r-- | third_party/rust/cubeb-pulse/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/.editorconfig | 12 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/.github/workflows/build.yml | 44 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/AUTHORS | 1 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/Cargo.toml | 38 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/LICENSE | 13 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/README.md | 5 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/context.rs | 756 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/cork_state.rs | 89 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/intern.rs | 58 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/mod.rs | 24 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/backend/stream.rs | 1560 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/capi.rs | 21 | ||||
-rw-r--r-- | third_party/rust/cubeb-pulse/src/lib.rs | 18 |
14 files changed, 2640 insertions, 0 deletions
diff --git a/third_party/rust/cubeb-pulse/.cargo-checksum.json b/third_party/rust/cubeb-pulse/.cargo-checksum.json new file mode 100644 index 0000000000..d33abb4393 --- /dev/null +++ b/third_party/rust/cubeb-pulse/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{".editorconfig":"bf047bd1da10cabb99eea666d1e57c321eba4716dccb3e4ed0e2c5fe3ca53858",".github/workflows/build.yml":"477366d58c9dc059dbe4a158a6e910f23a3e9ecac7411f73616e06375583b764","AUTHORS":"0e0ac930a68ce2f6b876126b195add177f0d3886facb9260f4d9b69f1988f0cc","Cargo.toml":"4bdd3962429c676eda59de96cad567478bb20b01c785c9ce3ba03b2e7837dd67","LICENSE":"44c6b5ae5ec3fe2fbc608b00e6f4896f4d2d5c7e525fcbaa3eaa3cf2f3d5a983","README.md":"0079450bb4b013bac065ed1750851e461a3710ebad1f323817da1cb82db0bc4f","src/backend/context.rs":"51dbf887e78aed5cd5e2255ac98f50c287960d8949ea3f66f2b05214b76800d8","src/backend/cork_state.rs":"4a0f1afc7d9f333dac89218cc56d7d32fbffb487cd48c1c9a4e03d79cb3b5e28","src/backend/intern.rs":"11ca424e4eb77f8eb9fd5a6717d1e791facf9743156a8534f0016fcf64d57b0f","src/backend/mod.rs":"dfb30ec497d6215e4535e936fea8fe3a407ef24dc1cec43b52c0ffa923d9229c","src/backend/stream.rs":"b3dcaa382981cbaa3af3c445b0ed6b5012b61d420d2e52a5ebd3cb0dd09a917c","src/capi.rs":"fa0fa020f0d0efe55aa0fc3596405e8407bbe2cbe6c7a558345304e6da87994e","src/lib.rs":"b41bbdc562cbfb130ed7c1e53fe69944774f515705341d8ce48a2f82c8c0c2c5"},"package":null}
\ No newline at end of file diff --git a/third_party/rust/cubeb-pulse/.editorconfig b/third_party/rust/cubeb-pulse/.editorconfig new file mode 100644 index 0000000000..d1f040a232 --- /dev/null +++ b/third_party/rust/cubeb-pulse/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true +indent_style = space +indent_size = 4 + +[*.md] +trim_trailing_whitespace = false diff --git a/third_party/rust/cubeb-pulse/.github/workflows/build.yml b/third_party/rust/cubeb-pulse/.github/workflows/build.yml new file mode 100644 index 0000000000..fc95140b5a --- /dev/null +++ b/third_party/rust/cubeb-pulse/.github/workflows/build.yml @@ -0,0 +1,44 @@ +name: Build + +on: [push, pull_request] + +jobs: + build: + runs-on: ubuntu-20.04 + continue-on-error: ${{ matrix.experimental }} + strategy: + fail-fast: false + matrix: + rust: [stable] + experimental: [false] + include: + - rust: nightly + experimental: true + + steps: + - uses: actions/checkout@v2 + with: + submodules: recursive + + - name: Install Rust + run: rustup toolchain install ${{ matrix.rust }} --profile minimal --component rustfmt clippy + + - name: Install Dependencies (Linux) + run: sudo apt-get update && sudo apt-get install libpulse-dev + + - name: Check format + shell: bash + run: rustup run ${{ matrix.rust }} cargo fmt --all -- --check + + - name: Clippy + shell: bash + run: rustup run ${{ matrix.rust }} cargo clippy --all -- -D warnings + + - name: Build + shell: bash + run: rustup run ${{ matrix.rust }} cargo build --all + + - name: Test + shell: bash + run: rustup run ${{ matrix.rust }} cargo test --all + diff --git a/third_party/rust/cubeb-pulse/AUTHORS b/third_party/rust/cubeb-pulse/AUTHORS new file mode 100644 index 0000000000..624fa14818 --- /dev/null +++ b/third_party/rust/cubeb-pulse/AUTHORS @@ -0,0 +1 @@ +Dan Glastonbury <dglastonbury@mozilla.com> diff --git a/third_party/rust/cubeb-pulse/Cargo.toml b/third_party/rust/cubeb-pulse/Cargo.toml new file mode 100644 index 0000000000..c8be5b482c --- /dev/null +++ b/third_party/rust/cubeb-pulse/Cargo.toml @@ -0,0 +1,38 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +name = "cubeb-pulse" +version = "0.5.0" +authors = ["Dan Glastonbury <dglastonbury@mozilla.com>"] +description = "Cubeb backed for PulseAudio written in Rust" +readme = "README.md" +license = "ISC" + +[lib] +crate-type = [ + "staticlib", + "rlib", +] + +[dependencies] +cubeb-backend = "0.12.0" +ringbuf = "0.2" +semver = "1.0" + +[dependencies.pulse] +path = "pulse-rs" + +[dependencies.pulse-ffi] +path = "pulse-ffi" + +[features] +pulse-dlopen = ["pulse-ffi/dlopen"] diff --git a/third_party/rust/cubeb-pulse/LICENSE b/third_party/rust/cubeb-pulse/LICENSE new file mode 100644 index 0000000000..fffc9dc405 --- /dev/null +++ b/third_party/rust/cubeb-pulse/LICENSE @@ -0,0 +1,13 @@ +Copyright © 2011 Mozilla Foundation + +Permission to use, copy, modify, and distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/third_party/rust/cubeb-pulse/README.md b/third_party/rust/cubeb-pulse/README.md new file mode 100644 index 0000000000..3636c5967c --- /dev/null +++ b/third_party/rust/cubeb-pulse/README.md @@ -0,0 +1,5 @@ +# cubeb-pulse-rs + +Implementation of PulseAudio backend for Cubeb written in Rust. + +[![Build Status](https://github.com/mozilla/cubeb-pulse-rs/actions/workflows/build.yml/badge.svg)](https://github.com/mozilla/cubeb-pulse-rs/actions/workflows/build.yml) diff --git a/third_party/rust/cubeb-pulse/src/backend/context.rs b/third_party/rust/cubeb-pulse/src/backend/context.rs new file mode 100644 index 0000000000..556b7a6a90 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/context.rs @@ -0,0 +1,756 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use backend::*; +use cubeb_backend::{ + ffi, log_enabled, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, + InputProcessingParams, Ops, Result, Stream, StreamParams, StreamParamsRef, +}; +use pulse::{self, ProplistExt}; +use pulse_ffi::*; +use semver; +use std::cell::RefCell; +use std::default::Default; +use std::ffi::{CStr, CString}; +use std::mem; +use std::os::raw::c_void; +use std::ptr; + +#[derive(Debug)] +pub struct DefaultInfo { + pub sample_spec: pulse::SampleSpec, + pub channel_map: pulse::ChannelMap, + pub flags: pulse::SinkFlags, +} + +pub const PULSE_OPS: Ops = capi_new!(PulseContext, PulseStream); + +#[repr(C)] +#[derive(Debug)] +pub struct PulseContext { + _ops: *const Ops, + pub mainloop: pulse::ThreadedMainloop, + pub context: Option<pulse::Context>, + pub default_sink_info: Option<DefaultInfo>, + pub context_name: Option<CString>, + pub input_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + pub input_collection_changed_user_ptr: *mut c_void, + pub output_collection_changed_callback: ffi::cubeb_device_collection_changed_callback, + pub output_collection_changed_user_ptr: *mut c_void, + pub error: bool, + pub version_2_0_0: bool, + pub version_0_9_8: bool, + #[cfg(feature = "pulse-dlopen")] + pub libpulse: LibLoader, + devids: RefCell<Intern>, +} + +impl PulseContext { + #[cfg(feature = "pulse-dlopen")] + fn _new(name: Option<CString>) -> Result<Box<Self>> { + let libpulse = unsafe { open() }; + if libpulse.is_none() { + cubeb_log!("libpulse not found"); + return Err(Error::error()); + } + + let ctx = Box::new(PulseContext { + _ops: &PULSE_OPS, + libpulse: libpulse.unwrap(), + mainloop: pulse::ThreadedMainloop::new(), + context: None, + default_sink_info: None, + context_name: name, + input_collection_changed_callback: None, + input_collection_changed_user_ptr: ptr::null_mut(), + output_collection_changed_callback: None, + output_collection_changed_user_ptr: ptr::null_mut(), + error: true, + version_0_9_8: false, + version_2_0_0: false, + devids: RefCell::new(Intern::new()), + }); + + Ok(ctx) + } + + #[cfg(not(feature = "pulse-dlopen"))] + fn _new(name: Option<CString>) -> Result<Box<Self>> { + Ok(Box::new(PulseContext { + _ops: &PULSE_OPS, + mainloop: pulse::ThreadedMainloop::new(), + context: None, + default_sink_info: None, + context_name: name, + input_collection_changed_callback: None, + input_collection_changed_user_ptr: ptr::null_mut(), + output_collection_changed_callback: None, + output_collection_changed_user_ptr: ptr::null_mut(), + error: true, + version_0_9_8: false, + version_2_0_0: false, + devids: RefCell::new(Intern::new()), + })) + } + + fn server_info_cb(context: &pulse::Context, info: Option<&pulse::ServerInfo>, u: *mut c_void) { + fn sink_info_cb(_: &pulse::Context, i: *const pulse::SinkInfo, eol: i32, u: *mut c_void) { + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + if eol == 0 { + let info = unsafe { &*i }; + let flags = pulse::SinkFlags::from_bits_truncate(info.flags); + ctx.default_sink_info = Some(DefaultInfo { + sample_spec: info.sample_spec, + channel_map: info.channel_map, + flags, + }); + } + ctx.mainloop.signal(); + } + + if let Some(info) = info { + let _ = context.get_sink_info_by_name( + try_cstr_from(info.default_sink_name), + sink_info_cb, + u, + ); + } else { + // If info is None, then an error occured. + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + ctx.mainloop.signal(); + } + } + + fn new(name: Option<&CStr>) -> Result<Box<Self>> { + let name = name.map(|s| s.to_owned()); + let mut ctx = PulseContext::_new(name)?; + + if ctx.mainloop.start().is_err() { + ctx.destroy(); + cubeb_log!("Error: couldn't start pulse's mainloop"); + return Err(Error::error()); + } + + if ctx.context_init().is_err() { + ctx.destroy(); + cubeb_log!("Error: couldn't init pulse's context"); + return Err(Error::error()); + } + + ctx.mainloop.lock(); + /* server_info_callback performs a second async query, + * which is responsible for initializing default_sink_info + * and signalling the mainloop to end the wait. */ + let user_data: *mut c_void = ctx.as_mut() as *mut _ as *mut _; + if let Some(ref context) = ctx.context { + if let Ok(o) = context.get_server_info(PulseContext::server_info_cb, user_data) { + ctx.operation_wait(None, &o); + } + } + ctx.mainloop.unlock(); + + /* Update `default_sink_info` when the default device changes. */ + if let Err(e) = ctx.subscribe_notifications(pulse::SubscriptionMask::SERVER) { + cubeb_log!("subscribe_notifications ignored failure: {}", e); + } + + // Return the result. + Ok(ctx) + } + + pub fn destroy(&mut self) { + self.context_destroy(); + + assert!( + self.input_collection_changed_callback.is_none() + && self.output_collection_changed_callback.is_none() + ); + + if !self.mainloop.is_null() { + self.mainloop.stop(); + } + } + + fn subscribe_notifications(&mut self, mask: pulse::SubscriptionMask) -> Result<()> { + fn update_collection( + _: &pulse::Context, + event: pulse::SubscriptionEvent, + index: u32, + user_data: *mut c_void, + ) { + let ctx = unsafe { &mut *(user_data as *mut PulseContext) }; + + let (f, t) = (event.event_facility(), event.event_type()); + if (f == pulse::SubscriptionEventFacility::Source) + | (f == pulse::SubscriptionEventFacility::Sink) + { + if (t == pulse::SubscriptionEventType::Remove) + | (t == pulse::SubscriptionEventType::New) + { + if log_enabled() { + let op = if t == pulse::SubscriptionEventType::New { + "Adding" + } else { + "Removing" + }; + let dev = if f == pulse::SubscriptionEventFacility::Sink { + "sink" + } else { + "source " + }; + cubeb_log!("{} {} index {}", op, dev, index); + } + + if f == pulse::SubscriptionEventFacility::Source { + unsafe { + ctx.input_collection_changed_callback.unwrap()( + ctx as *mut _ as *mut _, + ctx.input_collection_changed_user_ptr, + ); + } + } + if f == pulse::SubscriptionEventFacility::Sink { + unsafe { + ctx.output_collection_changed_callback.unwrap()( + ctx as *mut _ as *mut _, + ctx.output_collection_changed_user_ptr, + ); + } + } + } + } else if (f == pulse::SubscriptionEventFacility::Server) + && (t == pulse::SubscriptionEventType::Change) + { + cubeb_log!("Server changed {}", index as i32); + let user_data: *mut c_void = ctx as *mut _ as *mut _; + if let Some(ref context) = ctx.context { + if let Err(e) = context.get_server_info(PulseContext::server_info_cb, user_data) + { + cubeb_log!("Error: get_server_info ignored failure: {}", e); + } + } + } + } + + fn success(_: &pulse::Context, success: i32, user_data: *mut c_void) { + let ctx = unsafe { &*(user_data as *mut PulseContext) }; + if success != 1 { + cubeb_log!("subscribe_success ignored failure: {}", success); + } + ctx.mainloop.signal(); + } + + let user_data: *mut c_void = self as *const _ as *mut _; + if let Some(ref context) = self.context { + self.mainloop.lock(); + + context.set_subscribe_callback(update_collection, user_data); + + if let Ok(o) = context.subscribe(mask, success, self as *const _ as *mut _) { + self.operation_wait(None, &o); + } else { + self.mainloop.unlock(); + cubeb_log!("Error: context subscribe failed"); + return Err(Error::error()); + } + + self.mainloop.unlock(); + } + + Ok(()) + } +} + +impl ContextOps for PulseContext { + fn init(context_name: Option<&CStr>) -> Result<Context> { + let ctx = PulseContext::new(context_name)?; + Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) }) + } + + fn backend_id(&mut self) -> &'static CStr { + unsafe { CStr::from_ptr(b"pulse-rust\0".as_ptr() as *const _) } + } + + fn max_channel_count(&mut self) -> Result<u32> { + match self.default_sink_info { + Some(ref info) => Ok(u32::from(info.channel_map.channels)), + None => { + cubeb_log!("Error: couldn't get the max channel count"); + Err(Error::error()) + } + } + } + + fn min_latency(&mut self, params: StreamParams) -> Result<u32> { + // According to PulseAudio developers, this is a safe minimum. + Ok(25 * params.rate() / 1000) + } + + fn preferred_sample_rate(&mut self) -> Result<u32> { + match self.default_sink_info { + Some(ref info) => Ok(info.sample_spec.rate), + None => { + cubeb_log!("Error: Couldn't get the preferred sample rate"); + Err(Error::error()) + } + } + } + + fn supported_input_processing_params(&mut self) -> Result<InputProcessingParams> { + Ok(InputProcessingParams::NONE) + } + + fn enumerate_devices( + &mut self, + devtype: DeviceType, + collection: &DeviceCollectionRef, + ) -> Result<()> { + fn add_output_device( + _: &pulse::Context, + i: *const pulse::SinkInfo, + eol: i32, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + let ctx = list_data.context; + + if eol != 0 { + ctx.mainloop.signal(); + return; + } + + debug_assert!(!i.is_null()); + debug_assert!(!user_data.is_null()); + + let info = unsafe { &*i }; + + let group_id = match info.proplist().gets("sysfs.path") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let vendor_name = match info.proplist().gets("device.vendor.name") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let info_name = unsafe { CStr::from_ptr(info.name) }; + let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned(); + + let preferred = if *info_name == *list_data.default_sink_name { + ffi::CUBEB_DEVICE_PREF_ALL + } else { + ffi::CUBEB_DEVICE_PREF_NONE + }; + + let device_id = ctx.devids.borrow_mut().add(info_name); + let friendly_name = info_description.into_raw(); + let devinfo = ffi::cubeb_device_info { + device_id, + devid: device_id as ffi::cubeb_devid, + friendly_name, + group_id, + vendor_name, + device_type: ffi::CUBEB_DEVICE_TYPE_OUTPUT, + state: ctx.state_from_port(info.active_port), + preferred, + format: ffi::CUBEB_DEVICE_FMT_ALL, + default_format: pulse_format_to_cubeb_format(info.sample_spec.format), + max_channels: u32::from(info.channel_map.channels), + min_rate: 1, + max_rate: PA_RATE_MAX, + default_rate: info.sample_spec.rate, + latency_lo: 0, + latency_hi: 0, + }; + list_data.devinfo.push(devinfo); + } + + fn add_input_device( + _: &pulse::Context, + i: *const pulse::SourceInfo, + eol: i32, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + let ctx = list_data.context; + + if eol != 0 { + ctx.mainloop.signal(); + return; + } + + debug_assert!(!user_data.is_null()); + debug_assert!(!i.is_null()); + + let info = unsafe { &*i }; + + let group_id = match info.proplist().gets("sysfs.path") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let vendor_name = match info.proplist().gets("device.vendor.name") { + Some(p) => p.to_owned().into_raw(), + _ => ptr::null_mut(), + }; + + let info_name = unsafe { CStr::from_ptr(info.name) }; + let info_description = unsafe { CStr::from_ptr(info.description) }.to_owned(); + + let preferred = if *info_name == *list_data.default_source_name { + ffi::CUBEB_DEVICE_PREF_ALL + } else { + ffi::CUBEB_DEVICE_PREF_NONE + }; + + let device_id = ctx.devids.borrow_mut().add(info_name); + let friendly_name = info_description.into_raw(); + let devinfo = ffi::cubeb_device_info { + device_id, + devid: device_id as ffi::cubeb_devid, + friendly_name, + group_id, + vendor_name, + device_type: ffi::CUBEB_DEVICE_TYPE_INPUT, + state: ctx.state_from_port(info.active_port), + preferred, + format: ffi::CUBEB_DEVICE_FMT_ALL, + default_format: pulse_format_to_cubeb_format(info.sample_spec.format), + max_channels: u32::from(info.channel_map.channels), + min_rate: 1, + max_rate: PA_RATE_MAX, + default_rate: info.sample_spec.rate, + latency_lo: 0, + latency_hi: 0, + }; + + list_data.devinfo.push(devinfo); + } + + fn default_device_names( + _: &pulse::Context, + info: Option<&pulse::ServerInfo>, + user_data: *mut c_void, + ) { + let list_data = unsafe { &mut *(user_data as *mut PulseDevListData) }; + + if let Some(info) = info { + list_data.default_sink_name = super::try_cstr_from(info.default_sink_name) + .map(|s| s.to_owned()) + .unwrap_or_default(); + list_data.default_source_name = super::try_cstr_from(info.default_source_name) + .map(|s| s.to_owned()) + .unwrap_or_default(); + } + + list_data.context.mainloop.signal(); + } + + let mut user_data = PulseDevListData::new(self); + + if let Some(ref context) = self.context { + self.mainloop.lock(); + + if let Ok(o) = + context.get_server_info(default_device_names, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + + if devtype.contains(DeviceType::OUTPUT) { + if let Ok(o) = context + .get_sink_info_list(add_output_device, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + } + + if devtype.contains(DeviceType::INPUT) { + if let Ok(o) = context + .get_source_info_list(add_input_device, &mut user_data as *mut _ as *mut _) + { + self.operation_wait(None, &o); + } + } + + self.mainloop.unlock(); + } + + // Extract the array of cubeb_device_info from + // PulseDevListData and convert it into C representation. + let mut tmp = Vec::new(); + mem::swap(&mut user_data.devinfo, &mut tmp); + let mut devices = tmp.into_boxed_slice(); + let coll = unsafe { &mut *collection.as_ptr() }; + coll.device = devices.as_mut_ptr(); + coll.count = devices.len(); + + // Giving away the memory owned by devices. Don't free it! + mem::forget(devices); + Ok(()) + } + + fn device_collection_destroy(&mut self, collection: &mut DeviceCollectionRef) -> Result<()> { + debug_assert!(!collection.as_ptr().is_null()); + unsafe { + let coll = &mut *collection.as_ptr(); + let mut devices = Vec::from_raw_parts(coll.device, coll.count, coll.count); + for dev in &mut devices { + if !dev.group_id.is_null() { + let _ = CString::from_raw(dev.group_id as *mut _); + } + if !dev.vendor_name.is_null() { + let _ = CString::from_raw(dev.vendor_name as *mut _); + } + if !dev.friendly_name.is_null() { + let _ = CString::from_raw(dev.friendly_name as *mut _); + } + } + coll.device = ptr::null_mut(); + coll.count = 0; + } + Ok(()) + } + + #[cfg_attr(feature = "cargo-clippy", allow(clippy::too_many_arguments))] + fn stream_init( + &mut self, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Stream> { + if self.error { + self.context_init()?; + } + + let stm = PulseStream::new( + self, + stream_name, + input_device, + input_stream_params, + output_device, + output_stream_params, + latency_frames, + data_callback, + state_callback, + user_ptr, + )?; + Ok(unsafe { Stream::from_ptr(Box::into_raw(stm) as *mut _) }) + } + + fn register_device_collection_changed( + &mut self, + devtype: DeviceType, + cb: ffi::cubeb_device_collection_changed_callback, + user_ptr: *mut c_void, + ) -> Result<()> { + if devtype.contains(DeviceType::INPUT) { + self.input_collection_changed_callback = cb; + self.input_collection_changed_user_ptr = user_ptr; + } + if devtype.contains(DeviceType::OUTPUT) { + self.output_collection_changed_callback = cb; + self.output_collection_changed_user_ptr = user_ptr; + } + + let mut mask = pulse::SubscriptionMask::empty(); + if self.input_collection_changed_callback.is_some() { + mask |= pulse::SubscriptionMask::SOURCE; + } + if self.output_collection_changed_callback.is_some() { + mask |= pulse::SubscriptionMask::SINK; + } + /* Default device changed, this is always registered in order to update the + * `default_sink_info` when the default device changes. */ + mask |= pulse::SubscriptionMask::SERVER; + + self.subscribe_notifications(mask) + } +} + +impl Drop for PulseContext { + fn drop(&mut self) { + self.destroy(); + } +} + +impl PulseContext { + /* Initialize PulseAudio Context */ + fn context_init(&mut self) -> Result<()> { + fn error_state(c: &pulse::Context, u: *mut c_void) { + let ctx = unsafe { &mut *(u as *mut PulseContext) }; + if !c.get_state().is_good() { + ctx.error = true; + } + ctx.mainloop.signal(); + } + + if self.context.is_some() { + debug_assert!(self.error); + self.context_destroy(); + } + + self.context = { + let name = self.context_name.as_ref().map(|s| s.as_ref()); + pulse::Context::new(&self.mainloop.get_api(), name) + }; + + let context_ptr: *mut c_void = self as *mut _ as *mut _; + if self.context.is_none() { + cubeb_log!("Error: couldn't create pulse's context"); + return Err(Error::error()); + } + + self.mainloop.lock(); + let connected = if let Some(ref context) = self.context { + context.set_state_callback(error_state, context_ptr); + context + .connect(None, pulse::ContextFlags::empty(), ptr::null()) + .is_ok() + } else { + false + }; + + if !connected || !self.wait_until_context_ready() { + self.mainloop.unlock(); + self.context_destroy(); + cubeb_log!("Error: error while waiting for pulse's context to be ready"); + return Err(Error::error()); + } + + self.mainloop.unlock(); + + let version_str = unsafe { CStr::from_ptr(pulse::library_version()) }; + if let Ok(version) = semver::Version::parse(&version_str.to_string_lossy()) { + self.version_0_9_8 = + version >= semver::Version::parse("0.9.8").expect("Failed to parse version"); + self.version_2_0_0 = + version >= semver::Version::parse("2.0.0").expect("Failed to parse version"); + } + + self.error = false; + + Ok(()) + } + + fn context_destroy(&mut self) { + fn drain_complete(_: &pulse::Context, u: *mut c_void) { + let ctx = unsafe { &*(u as *mut PulseContext) }; + ctx.mainloop.signal(); + } + + let context_ptr: *mut c_void = self as *mut _ as *mut _; + if let Some(ctx) = self.context.take() { + self.mainloop.lock(); + if let Ok(o) = ctx.drain(drain_complete, context_ptr) { + self.operation_wait(None, &o); + } + ctx.clear_state_callback(); + ctx.disconnect(); + ctx.unref(); + self.mainloop.unlock(); + } + } + + pub fn operation_wait<'a, S>(&self, s: S, o: &pulse::Operation) -> bool + where + S: Into<Option<&'a pulse::Stream>>, + { + let stream = s.into(); + while o.get_state() == PA_OPERATION_RUNNING { + self.mainloop.wait(); + if let Some(ref context) = self.context { + if !context.get_state().is_good() { + return false; + } + } + + if let Some(stm) = stream { + if !stm.get_state().is_good() { + return false; + } + } + } + + true + } + + pub fn wait_until_context_ready(&self) -> bool { + if let Some(ref context) = self.context { + loop { + let state = context.get_state(); + if !state.is_good() { + return false; + } + if state == pulse::ContextState::Ready { + break; + } + self.mainloop.wait(); + } + } + + true + } + + fn state_from_port(&self, i: *const pa_port_info) -> ffi::cubeb_device_state { + if !i.is_null() { + let info = unsafe { *i }; + if self.version_2_0_0 && info.available == PA_PORT_AVAILABLE_NO { + ffi::CUBEB_DEVICE_STATE_UNPLUGGED + } else { + ffi::CUBEB_DEVICE_STATE_ENABLED + } + } else { + ffi::CUBEB_DEVICE_STATE_ENABLED + } + } +} + +struct PulseDevListData<'a> { + default_sink_name: CString, + default_source_name: CString, + devinfo: Vec<ffi::cubeb_device_info>, + context: &'a PulseContext, +} + +impl<'a> PulseDevListData<'a> { + pub fn new<'b>(context: &'b PulseContext) -> Self + where + 'b: 'a, + { + PulseDevListData { + default_sink_name: CString::default(), + default_source_name: CString::default(), + devinfo: Vec::new(), + context, + } + } +} + +impl<'a> Drop for PulseDevListData<'a> { + fn drop(&mut self) { + for elem in &mut self.devinfo { + let _ = unsafe { Box::from_raw(elem) }; + } + } +} + +fn pulse_format_to_cubeb_format(format: pa_sample_format_t) -> ffi::cubeb_device_fmt { + match format { + PA_SAMPLE_S16LE => ffi::CUBEB_DEVICE_FMT_S16LE, + PA_SAMPLE_S16BE => ffi::CUBEB_DEVICE_FMT_S16BE, + PA_SAMPLE_FLOAT32LE => ffi::CUBEB_DEVICE_FMT_F32LE, + PA_SAMPLE_FLOAT32BE => ffi::CUBEB_DEVICE_FMT_F32BE, + // Unsupported format, return F32NE + _ => ffi::CUBEB_DEVICE_FMT_F32NE, + } +} diff --git a/third_party/rust/cubeb-pulse/src/backend/cork_state.rs b/third_party/rust/cubeb-pulse/src/backend/cork_state.rs new file mode 100644 index 0000000000..13bc2c4bad --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/cork_state.rs @@ -0,0 +1,89 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use std::ops; + +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct CorkState(u32); + +const UNCORK: u32 = 0b00; +const CORK: u32 = 0b01; +const NOTIFY: u32 = 0b10; +const ALL: u32 = 0b11; + +impl CorkState { + #[inline] + pub fn uncork() -> Self { + CorkState(UNCORK) + } + #[inline] + pub fn cork() -> Self { + CorkState(CORK) + } + #[inline] + pub fn notify() -> Self { + CorkState(NOTIFY) + } + + #[inline] + pub fn is_cork(&self) -> bool { + self.contains(CorkState::cork()) + } + #[inline] + pub fn is_notify(&self) -> bool { + self.contains(CorkState::notify()) + } + + #[inline] + pub fn contains(&self, other: Self) -> bool { + (*self & other) == other + } +} + +impl ops::BitOr for CorkState { + type Output = CorkState; + + #[inline] + fn bitor(self, other: Self) -> Self { + CorkState(self.0 | other.0) + } +} + +impl ops::BitXor for CorkState { + type Output = CorkState; + + #[inline] + fn bitxor(self, other: Self) -> Self { + CorkState(self.0 ^ other.0) + } +} + +impl ops::BitAnd for CorkState { + type Output = CorkState; + + #[inline] + fn bitand(self, other: Self) -> Self { + CorkState(self.0 & other.0) + } +} + +impl ops::Sub for CorkState { + type Output = CorkState; + + #[inline] + fn sub(self, other: Self) -> Self { + CorkState(self.0 & !other.0) + } +} + +impl ops::Not for CorkState { + type Output = CorkState; + + #[inline] + fn not(self) -> Self { + CorkState(!self.0 & ALL) + } +} diff --git a/third_party/rust/cubeb-pulse/src/backend/intern.rs b/third_party/rust/cubeb-pulse/src/backend/intern.rs new file mode 100644 index 0000000000..bd20174916 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/intern.rs @@ -0,0 +1,58 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use std::ffi::{CStr, CString}; +use std::os::raw::c_char; + +#[derive(Debug)] +pub struct Intern { + vec: Vec<CString>, +} + +impl Intern { + pub fn new() -> Intern { + Intern { vec: Vec::new() } + } + + pub fn add(&mut self, string: &CStr) -> *const c_char { + fn eq(s1: &CStr, s2: &CStr) -> bool { + s1 == s2 + } + for s in &self.vec { + if eq(s, string) { + return s.as_ptr(); + } + } + + self.vec.push(string.to_owned()); + self.vec.last().unwrap().as_ptr() + } +} + +#[cfg(test)] +mod tests { + use super::Intern; + use std::ffi::CStr; + + #[test] + fn intern() { + fn cstr(str: &[u8]) -> &CStr { + CStr::from_bytes_with_nul(str).unwrap() + } + + let mut intern = Intern::new(); + + let foo_ptr = intern.add(cstr(b"foo\0")); + let bar_ptr = intern.add(cstr(b"bar\0")); + assert!(!foo_ptr.is_null()); + assert!(!bar_ptr.is_null()); + assert!(foo_ptr != bar_ptr); + + assert!(foo_ptr == intern.add(cstr(b"foo\0"))); + assert!(foo_ptr != intern.add(cstr(b"fo\0"))); + assert!(foo_ptr != intern.add(cstr(b"fool\0"))); + assert!(foo_ptr != intern.add(cstr(b"not foo\0"))); + } +} diff --git a/third_party/rust/cubeb-pulse/src/backend/mod.rs b/third_party/rust/cubeb-pulse/src/backend/mod.rs new file mode 100644 index 0000000000..5b53698f9c --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/mod.rs @@ -0,0 +1,24 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +mod context; +mod cork_state; +mod intern; +mod stream; + +pub use self::context::PulseContext; +use self::intern::Intern; +pub use self::stream::PulseStream; +use std::ffi::CStr; +use std::os::raw::c_char; + +// helper to convert *const c_char to Option<CStr> +fn try_cstr_from<'str>(s: *const c_char) -> Option<&'str CStr> { + if s.is_null() { + None + } else { + Some(unsafe { CStr::from_ptr(s) }) + } +} diff --git a/third_party/rust/cubeb-pulse/src/backend/stream.rs b/third_party/rust/cubeb-pulse/src/backend/stream.rs new file mode 100644 index 0000000000..8e87110907 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/backend/stream.rs @@ -0,0 +1,1560 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use backend::cork_state::CorkState; +use backend::*; +use cubeb_backend::{ + ffi, log_enabled, ChannelLayout, DeviceId, DeviceRef, Error, InputProcessingParams, Result, + SampleFormat, StreamOps, StreamParamsRef, StreamPrefs, +}; +use pulse::{self, CVolumeExt, ChannelMapExt, SampleSpecExt, StreamLatency, USecExt}; +use pulse_ffi::*; +use ringbuf::RingBuffer; +use std::ffi::{CStr, CString}; +use std::os::raw::{c_long, c_void}; +use std::slice; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::{mem, ptr}; + +use self::LinearInputBuffer::*; +use self::RingBufferConsumer::*; +use self::RingBufferProducer::*; + +const PULSE_NO_GAIN: f32 = -1.0; + +/// Iterator interface to `ChannelLayout`. +/// +/// Iterates each channel in the set represented by `ChannelLayout`. +struct ChannelLayoutIter { + /// The layout set being iterated + layout: ChannelLayout, + /// The next flag to test + index: u8, +} + +fn channel_layout_iter(layout: ChannelLayout) -> ChannelLayoutIter { + let index = 0; + ChannelLayoutIter { layout, index } +} + +impl Iterator for ChannelLayoutIter { + type Item = ChannelLayout; + + fn next(&mut self) -> Option<Self::Item> { + while !self.layout.is_empty() { + let test = Self::Item::from_bits_truncate(1 << self.index); + self.index += 1; + if self.layout.contains(test) { + self.layout.remove(test); + return Some(test); + } + } + None + } +} + +fn cubeb_channel_to_pa_channel(channel: ffi::cubeb_channel) -> pa_channel_position_t { + match channel { + ffi::CHANNEL_FRONT_LEFT => PA_CHANNEL_POSITION_FRONT_LEFT, + ffi::CHANNEL_FRONT_RIGHT => PA_CHANNEL_POSITION_FRONT_RIGHT, + ffi::CHANNEL_FRONT_CENTER => PA_CHANNEL_POSITION_FRONT_CENTER, + ffi::CHANNEL_LOW_FREQUENCY => PA_CHANNEL_POSITION_LFE, + ffi::CHANNEL_BACK_LEFT => PA_CHANNEL_POSITION_REAR_LEFT, + ffi::CHANNEL_BACK_RIGHT => PA_CHANNEL_POSITION_REAR_RIGHT, + ffi::CHANNEL_FRONT_LEFT_OF_CENTER => PA_CHANNEL_POSITION_FRONT_LEFT_OF_CENTER, + ffi::CHANNEL_FRONT_RIGHT_OF_CENTER => PA_CHANNEL_POSITION_FRONT_RIGHT_OF_CENTER, + ffi::CHANNEL_BACK_CENTER => PA_CHANNEL_POSITION_REAR_CENTER, + ffi::CHANNEL_SIDE_LEFT => PA_CHANNEL_POSITION_SIDE_LEFT, + ffi::CHANNEL_SIDE_RIGHT => PA_CHANNEL_POSITION_SIDE_RIGHT, + ffi::CHANNEL_TOP_CENTER => PA_CHANNEL_POSITION_TOP_CENTER, + ffi::CHANNEL_TOP_FRONT_LEFT => PA_CHANNEL_POSITION_TOP_FRONT_LEFT, + ffi::CHANNEL_TOP_FRONT_CENTER => PA_CHANNEL_POSITION_TOP_FRONT_CENTER, + ffi::CHANNEL_TOP_FRONT_RIGHT => PA_CHANNEL_POSITION_TOP_FRONT_RIGHT, + ffi::CHANNEL_TOP_BACK_LEFT => PA_CHANNEL_POSITION_TOP_REAR_LEFT, + ffi::CHANNEL_TOP_BACK_CENTER => PA_CHANNEL_POSITION_TOP_REAR_CENTER, + ffi::CHANNEL_TOP_BACK_RIGHT => PA_CHANNEL_POSITION_TOP_REAR_RIGHT, + _ => PA_CHANNEL_POSITION_INVALID, + } +} + +fn layout_to_channel_map(layout: ChannelLayout) -> pulse::ChannelMap { + assert_ne!(layout, ChannelLayout::UNDEFINED); + + let mut cm = pulse::ChannelMap::init(); + for (i, channel) in channel_layout_iter(layout).enumerate() { + cm.map[i] = cubeb_channel_to_pa_channel(channel.into()); + } + cm.channels = layout.num_channels() as _; + + // Special case single channel center mapping as mono. + if cm.channels == 1 && cm.map[0] == PA_CHANNEL_POSITION_FRONT_CENTER { + cm.map[0] = PA_CHANNEL_POSITION_MONO; + } + + cm +} + +fn default_layout_for_channels(ch: u32) -> ChannelLayout { + match ch { + 1 => ChannelLayout::MONO, + 2 => ChannelLayout::STEREO, + 3 => ChannelLayout::_3F, + 4 => ChannelLayout::QUAD, + 5 => ChannelLayout::_3F2, + 6 => ChannelLayout::_3F_LFE | ChannelLayout::SIDE_LEFT | ChannelLayout::SIDE_RIGHT, + 7 => ChannelLayout::_3F3R_LFE, + 8 => ChannelLayout::_3F4_LFE, + _ => panic!("channel must be between 1 to 8."), + } +} + +pub struct Device(ffi::cubeb_device); + +impl Drop for Device { + fn drop(&mut self) { + unsafe { + if !self.0.input_name.is_null() { + let _ = CString::from_raw(self.0.input_name as *mut _); + } + if !self.0.output_name.is_null() { + let _ = CString::from_raw(self.0.output_name as *mut _); + } + } + } +} + +enum RingBufferConsumer { + IntegerRingBufferConsumer(ringbuf::Consumer<i16>), + FloatRingBufferConsumer(ringbuf::Consumer<f32>), +} + +enum RingBufferProducer { + IntegerRingBufferProducer(ringbuf::Producer<i16>), + FloatRingBufferProducer(ringbuf::Producer<f32>), +} + +enum LinearInputBuffer { + IntegerLinearInputBuffer(Vec<i16>), + FloatLinearInputBuffer(Vec<f32>), +} + +struct BufferManager { + consumer: RingBufferConsumer, + producer: RingBufferProducer, + linear_input_buffer: LinearInputBuffer, +} + +impl BufferManager { + // When opening a duplex stream, the sample-spec are guaranteed to match. It's ok to have + // either the input or output sample-spec here. + fn new(input_buffer_size: usize, sample_spec: &pulse::SampleSpec) -> BufferManager { + if sample_spec.format == PA_SAMPLE_S16BE || sample_spec.format == PA_SAMPLE_S16LE { + let ring = RingBuffer::<i16>::new(input_buffer_size); + let (prod, cons) = ring.split(); + BufferManager { + producer: IntegerRingBufferProducer(prod), + consumer: IntegerRingBufferConsumer(cons), + linear_input_buffer: IntegerLinearInputBuffer(Vec::<i16>::with_capacity( + input_buffer_size, + )), + } + } else { + let ring = RingBuffer::<f32>::new(input_buffer_size); + let (prod, cons) = ring.split(); + BufferManager { + producer: FloatRingBufferProducer(prod), + consumer: FloatRingBufferConsumer(cons), + linear_input_buffer: FloatLinearInputBuffer(Vec::<f32>::with_capacity( + input_buffer_size, + )), + } + } + } + + fn push_input_data(&mut self, input_data: *const c_void, read_samples: usize) { + match &mut self.producer { + RingBufferProducer::FloatRingBufferProducer(p) => { + let input_data = + unsafe { slice::from_raw_parts::<f32>(input_data as *const f32, read_samples) }; + // we don't do anything in particular if we can't push everything + p.push_slice(input_data); + } + RingBufferProducer::IntegerRingBufferProducer(p) => { + let input_data = + unsafe { slice::from_raw_parts::<i16>(input_data as *const i16, read_samples) }; + p.push_slice(input_data); + } + } + } + + fn pull_input_data(&mut self, input_data: *mut c_void, needed_samples: usize) { + match &mut self.consumer { + IntegerRingBufferConsumer(p) => { + let input: &mut [i16] = unsafe { + slice::from_raw_parts_mut::<i16>(input_data as *mut i16, needed_samples) + }; + let read = p.pop_slice(input); + if read < needed_samples { + for i in 0..(needed_samples - read) { + input[read + i] = 0; + } + } + } + FloatRingBufferConsumer(p) => { + let input: &mut [f32] = unsafe { + slice::from_raw_parts_mut::<f32>(input_data as *mut f32, needed_samples) + }; + let read = p.pop_slice(input); + if read < needed_samples { + for i in 0..(needed_samples - read) { + input[read + i] = 0.; + } + } + } + } + } + + fn get_linear_input_data(&mut self, nsamples: usize) -> *const c_void { + let p = match &mut self.linear_input_buffer { + LinearInputBuffer::IntegerLinearInputBuffer(b) => { + b.resize(nsamples, 0); + b.as_mut_ptr() as *mut c_void + } + LinearInputBuffer::FloatLinearInputBuffer(b) => { + b.resize(nsamples, 0.); + b.as_mut_ptr() as *mut c_void + } + }; + self.pull_input_data(p, nsamples); + + p + } + + pub fn trim(&mut self, final_size: usize) { + match &self.linear_input_buffer { + LinearInputBuffer::IntegerLinearInputBuffer(b) => { + let length = b.len(); + assert!(final_size <= length); + let nframes_to_pop = length - final_size; + self.get_linear_input_data(nframes_to_pop); + } + LinearInputBuffer::FloatLinearInputBuffer(b) => { + let length = b.len(); + assert!(final_size <= length); + let nframes_to_pop = length - final_size; + self.get_linear_input_data(nframes_to_pop); + } + } + } + pub fn available_samples(&mut self) -> usize { + match &self.linear_input_buffer { + LinearInputBuffer::IntegerLinearInputBuffer(b) => b.len(), + LinearInputBuffer::FloatLinearInputBuffer(b) => b.len(), + } + } +} + +impl std::fmt::Debug for BufferManager { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "") + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct PulseStream<'ctx> { + context: &'ctx PulseContext, + user_ptr: *mut c_void, + output_stream: Option<pulse::Stream>, + input_stream: Option<pulse::Stream>, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + drain_timer: AtomicPtr<pa_time_event>, + output_sample_spec: pulse::SampleSpec, + input_sample_spec: pulse::SampleSpec, + // output frames count excluding pre-buffering + output_frame_count: AtomicUsize, + shutdown: bool, + volume: f32, + state: ffi::cubeb_state, + input_buffer_manager: Option<BufferManager>, +} + +impl<'ctx> PulseStream<'ctx> { + #[cfg_attr(feature = "cargo-clippy", allow(clippy::too_many_arguments))] + pub fn new( + context: &'ctx PulseContext, + stream_name: Option<&CStr>, + input_device: DeviceId, + input_stream_params: Option<&StreamParamsRef>, + output_device: DeviceId, + output_stream_params: Option<&StreamParamsRef>, + latency_frames: u32, + data_callback: ffi::cubeb_data_callback, + state_callback: ffi::cubeb_state_callback, + user_ptr: *mut c_void, + ) -> Result<Box<Self>> { + fn check_error(s: &pulse::Stream, u: *mut c_void) { + let stm = unsafe { &mut *(u as *mut PulseStream) }; + if !s.get_state().is_good() { + cubeb_alog!("Calling error callback"); + stm.state_change_callback(ffi::CUBEB_STATE_ERROR); + } + stm.context.mainloop.signal(); + } + + fn read_data(s: &pulse::Stream, nbytes: usize, u: *mut c_void) { + fn read_from_input( + s: &pulse::Stream, + buffer: *mut *const c_void, + size: *mut usize, + ) -> i32 { + let readable_size = s.readable_size().map(|s| s as i32).unwrap_or(-1); + if readable_size > 0 && unsafe { s.peek(buffer, size).is_err() } { + cubeb_logv!("Error while peeking the input stream"); + return -1; + } + readable_size + } + + cubeb_alogv!("Input callback buffer size {}", nbytes); + let stm = unsafe { &mut *(u as *mut PulseStream) }; + if stm.shutdown { + return; + } + + let mut read_data: *const c_void = ptr::null(); + let mut read_size: usize = 0; + while read_from_input(s, &mut read_data, &mut read_size) > 0 { + /* read_data can be NULL in case of a hole. */ + if !read_data.is_null() { + let in_frame_size = stm.input_sample_spec.frame_size(); + let read_frames = read_size / in_frame_size; + let read_samples = read_size / stm.input_sample_spec.sample_size(); + + if stm.output_stream.is_some() { + // duplex stream: push the input data to the ring buffer. + stm.input_buffer_manager + .as_mut() + .unwrap() + .push_input_data(read_data, read_samples); + } else { + // input/capture only operation. Call callback directly + let got = unsafe { + stm.data_callback.unwrap()( + stm as *mut _ as *mut _, + stm.user_ptr, + read_data, + ptr::null_mut(), + read_frames as c_long, + ) + }; + + if got < 0 || got as usize != read_frames { + let _ = s.cancel_write(); + stm.shutdown = true; + if got < 0 { + unsafe { + stm.state_callback.unwrap()( + stm as *mut _ as *mut _, + stm.user_ptr, + ffi::CUBEB_STATE_ERROR, + ); + } + } + break; + } + } + } + + if read_size > 0 { + let _ = s.drop(); + } + + if stm.shutdown { + return; + } + } + } + + fn write_data(_: &pulse::Stream, nbytes: usize, u: *mut c_void) { + cubeb_alogv!("Output callback to be written buffer size {}", nbytes); + let stm = unsafe { &mut *(u as *mut PulseStream) }; + if stm.shutdown || stm.state != ffi::CUBEB_STATE_STARTED { + return; + } + + let nframes = nbytes / stm.output_sample_spec.frame_size(); + let first_callback = stm.output_frame_count.fetch_add(nframes, Ordering::SeqCst) == 0; + if stm.input_stream.is_some() { + let nsamples_input = nframes * stm.input_sample_spec.channels as usize; + let input_buffer_manager = stm.input_buffer_manager.as_mut().unwrap(); + + if first_callback { + let buffered_input_frames = input_buffer_manager.available_samples() + / stm.input_sample_spec.channels as usize; + if buffered_input_frames > nframes { + // Trim the buffer to ensure minimal roundtrip latency + let popped_frames = buffered_input_frames - nframes; + input_buffer_manager + .trim(nframes * stm.input_sample_spec.channels as usize); + cubeb_alog!("Dropping {} frames in input buffer.", popped_frames); + } + } + + let p = input_buffer_manager.get_linear_input_data(nsamples_input); + stm.trigger_user_callback(p, nbytes); + } else { + // Output/playback only operation. + // Write directly to output + debug_assert!(stm.output_stream.is_some()); + stm.trigger_user_callback(ptr::null(), nbytes); + } + } + + let mut stm = Box::new(PulseStream { + context, + output_stream: None, + input_stream: None, + data_callback, + state_callback, + user_ptr, + drain_timer: AtomicPtr::new(ptr::null_mut()), + output_sample_spec: pulse::SampleSpec::default(), + input_sample_spec: pulse::SampleSpec::default(), + output_frame_count: AtomicUsize::new(0), + shutdown: false, + volume: PULSE_NO_GAIN, + state: ffi::CUBEB_STATE_ERROR, + input_buffer_manager: None, + }); + + if let Some(ref context) = stm.context.context { + stm.context.mainloop.lock(); + + // Setup output stream + if let Some(stream_params) = output_stream_params { + match PulseStream::stream_init(context, stream_params, stream_name) { + Ok(s) => { + stm.output_sample_spec = *s.get_sample_spec(); + + s.set_state_callback(check_error, stm.as_mut() as *mut _ as *mut _); + s.set_write_callback(write_data, stm.as_mut() as *mut _ as *mut _); + + let buffer_size_bytes = + latency_frames * stm.output_sample_spec.frame_size() as u32; + + let battr = pa_buffer_attr { + maxlength: u32::max_value(), + prebuf: u32::max_value(), + fragsize: u32::max_value(), + tlength: buffer_size_bytes * 2, + minreq: buffer_size_bytes / 4, + }; + let device_name = super::try_cstr_from(output_device as *const _); + let mut stream_flags = pulse::StreamFlags::AUTO_TIMING_UPDATE + | pulse::StreamFlags::INTERPOLATE_TIMING + | pulse::StreamFlags::START_CORKED + | pulse::StreamFlags::ADJUST_LATENCY; + if device_name.is_some() + || stream_params + .prefs() + .contains(StreamPrefs::DISABLE_DEVICE_SWITCHING) + { + stream_flags |= pulse::StreamFlags::DONT_MOVE; + } + let _ = s.connect_playback(device_name, &battr, stream_flags, None, None); + + stm.output_stream = Some(s); + } + Err(e) => { + cubeb_log!("Output stream initialization error"); + stm.context.mainloop.unlock(); + stm.destroy(); + return Err(e); + } + } + } + + // Set up input stream + if let Some(stream_params) = input_stream_params { + match PulseStream::stream_init(context, stream_params, stream_name) { + Ok(s) => { + stm.input_sample_spec = *s.get_sample_spec(); + + s.set_state_callback(check_error, stm.as_mut() as *mut _ as *mut _); + s.set_read_callback(read_data, stm.as_mut() as *mut _ as *mut _); + + let buffer_size_bytes = + latency_frames * stm.input_sample_spec.frame_size() as u32; + let battr = pa_buffer_attr { + maxlength: u32::max_value(), + prebuf: u32::max_value(), + fragsize: buffer_size_bytes, + tlength: buffer_size_bytes, + minreq: buffer_size_bytes, + }; + let device_name = super::try_cstr_from(input_device as *const _); + let mut stream_flags = pulse::StreamFlags::AUTO_TIMING_UPDATE + | pulse::StreamFlags::INTERPOLATE_TIMING + | pulse::StreamFlags::START_CORKED + | pulse::StreamFlags::ADJUST_LATENCY; + if device_name.is_some() + || stream_params + .prefs() + .contains(StreamPrefs::DISABLE_DEVICE_SWITCHING) + { + stream_flags |= pulse::StreamFlags::DONT_MOVE; + } + let _ = s.connect_record(device_name, &battr, stream_flags); + + stm.input_stream = Some(s); + } + Err(e) => { + cubeb_log!("Input stream initialization error"); + stm.context.mainloop.unlock(); + stm.destroy(); + return Err(e); + } + } + } + + // Duplex, set up the ringbuffer + if input_stream_params.is_some() && output_stream_params.is_some() { + // A bit more room in case of output underrun. + let buffer_size_bytes = + 2 * latency_frames * stm.input_sample_spec.frame_size() as u32; + stm.input_buffer_manager = Some(BufferManager::new( + buffer_size_bytes as usize, + &stm.input_sample_spec, + )) + } + + let r = if stm.wait_until_ready() { + /* force a timing update now, otherwise timing info does not become valid + until some point after initialization has completed. */ + stm.update_timing_info() + } else { + false + }; + + stm.context.mainloop.unlock(); + + if !r { + stm.destroy(); + cubeb_log!("Error while waiting for the stream to be ready"); + return Err(Error::error()); + } + + // TODO: + if log_enabled() { + if let Some(ref output_stream) = stm.output_stream { + let output_att = output_stream.get_buffer_attr(); + cubeb_log!( + "Output buffer attributes maxlength {}, tlength {}, \ + prebuf {}, minreq {}, fragsize {}", + output_att.maxlength, + output_att.tlength, + output_att.prebuf, + output_att.minreq, + output_att.fragsize + ); + } + + if let Some(ref input_stream) = stm.input_stream { + let input_att = input_stream.get_buffer_attr(); + cubeb_log!( + "Input buffer attributes maxlength {}, tlength {}, \ + prebuf {}, minreq {}, fragsize {}", + input_att.maxlength, + input_att.tlength, + input_att.prebuf, + input_att.minreq, + input_att.fragsize + ); + } + } + } + + Ok(stm) + } + + fn destroy(&mut self) { + self.cork(CorkState::cork()); + + self.context.mainloop.lock(); + { + if let Some(stm) = self.output_stream.take() { + let drain_timer = self.drain_timer.load(Ordering::Acquire); + if !drain_timer.is_null() { + /* there's no pa_rttime_free, so use this instead. */ + self.context.mainloop.get_api().time_free(drain_timer); + } + stm.clear_state_callback(); + stm.clear_write_callback(); + let _ = stm.disconnect(); + stm.unref(); + } + + if let Some(stm) = self.input_stream.take() { + stm.clear_state_callback(); + stm.clear_read_callback(); + let _ = stm.disconnect(); + stm.unref(); + } + } + self.context.mainloop.unlock(); + } +} + +impl<'ctx> Drop for PulseStream<'ctx> { + fn drop(&mut self) { + self.destroy(); + } +} + +impl<'ctx> StreamOps for PulseStream<'ctx> { + fn start(&mut self) -> Result<()> { + fn output_preroll(_: &pulse::MainloopApi, u: *mut c_void) { + let stm = unsafe { &mut *(u as *mut PulseStream) }; + if !stm.shutdown { + let size = stm + .output_stream + .as_ref() + .map_or(0, |s| s.writable_size().unwrap_or(0)); + stm.trigger_user_callback(std::ptr::null(), size); + } + } + self.shutdown = false; + self.cork(CorkState::uncork() | CorkState::notify()); + + if self.output_stream.is_some() { + /* When doing output-only or duplex, we need to manually call user cb once in order to + * make things roll. This is done via a defer event in order to execute it from PA + * server thread. */ + self.context.mainloop.lock(); + self.context + .mainloop + .get_api() + .once(output_preroll, self as *const _ as *mut _); + self.context.mainloop.unlock(); + } + + Ok(()) + } + + fn stop(&mut self) -> Result<()> { + { + self.context.mainloop.lock(); + self.shutdown = true; + // If draining is taking place wait to finish + cubeb_log!("Stream stop: waiting for drain"); + while !self.drain_timer.load(Ordering::Acquire).is_null() { + self.context.mainloop.wait(); + } + cubeb_log!("Stream stop: waited for drain"); + self.context.mainloop.unlock(); + } + self.cork(CorkState::cork() | CorkState::notify()); + + Ok(()) + } + + fn position(&mut self) -> Result<u64> { + let in_thread = self.context.mainloop.in_thread(); + + if !in_thread { + self.context.mainloop.lock(); + } + + if self.output_stream.is_none() { + cubeb_log!("Calling position() on an input-only stream"); + return Err(Error::error()); + } + + let stm = self.output_stream.as_ref().unwrap(); + let r = match stm.get_time() { + Ok(r_usec) => { + let bytes = USecExt::to_bytes(r_usec, &self.output_sample_spec); + Ok((bytes / self.output_sample_spec.frame_size()) as u64) + } + Err(_) => { + cubeb_log!("Error: stm.get_time failed"); + Err(Error::error()) + } + }; + + if !in_thread { + self.context.mainloop.unlock(); + } + + r + } + + fn latency(&mut self) -> Result<u32> { + match self.output_stream { + None => { + cubeb_log!("Error: calling latency() on an input-only stream"); + Err(Error::error()) + } + Some(ref stm) => match stm.get_latency() { + Ok(StreamLatency::Positive(r_usec)) => { + let latency = (r_usec * pa_usec_t::from(self.output_sample_spec.rate) + / PA_USEC_PER_SEC) as u32; + Ok(latency) + } + Ok(_) => { + panic!("Can not handle negative latency values."); + } + Err(_) => { + cubeb_log!("Error: get_latency() failed for an output stream"); + Err(Error::error()) + } + }, + } + } + + fn input_latency(&mut self) -> Result<u32> { + match self.input_stream { + None => { + cubeb_log!("Error: calling input_latency() on an output-only stream"); + Err(Error::error()) + } + Some(ref stm) => match stm.get_latency() { + Ok(StreamLatency::Positive(w_usec)) => { + let latency = (w_usec * pa_usec_t::from(self.input_sample_spec.rate) + / PA_USEC_PER_SEC) as u32; + Ok(latency) + } + // Input stream can be negative only if it is attached to a + // monitor source device + Ok(StreamLatency::Negative(_)) => Ok(0), + Err(_) => { + cubeb_log!("Error: stm.get_latency() failed for an input stream"); + Err(Error::error()) + } + }, + } + } + + fn set_volume(&mut self, volume: f32) -> Result<()> { + match self.output_stream { + None => { + cubeb_log!("Error: can't set volume on an input-only stream"); + Err(Error::error()) + } + Some(ref stm) => { + if let Some(ref context) = self.context.context { + self.context.mainloop.lock(); + + let mut cvol: pa_cvolume = Default::default(); + + /* if the pulse daemon is configured to use flat + * volumes, apply our own gain instead of changing + * the input volume on the sink. */ + let flags = { + match self.context.default_sink_info { + Some(ref info) => info.flags, + _ => pulse::SinkFlags::empty(), + } + }; + + if flags.contains(pulse::SinkFlags::FLAT_VOLUME) { + self.volume = volume; + } else { + let channels = stm.get_sample_spec().channels; + let vol = pulse::sw_volume_from_linear(f64::from(volume)); + cvol.set(u32::from(channels), vol); + + let index = stm.get_index(); + + let context_ptr = self.context as *const _ as *mut _; + if let Ok(o) = context.set_sink_input_volume( + index, + &cvol, + context_success, + context_ptr, + ) { + self.context.operation_wait(stm, &o); + } + } + + self.context.mainloop.unlock(); + Ok(()) + } else { + cubeb_log!("Error: set_volume: no context?"); + Err(Error::error()) + } + } + } + } + + fn set_name(&mut self, name: &CStr) -> Result<()> { + match self.output_stream { + None => { + cubeb_log!("Error: can't set the name on a input-only stream."); + Err(Error::error()) + } + Some(ref stm) => { + self.context.mainloop.lock(); + if let Ok(o) = stm.set_name(name, stream_success, self as *const _ as *mut _) { + self.context.operation_wait(stm, &o); + } + self.context.mainloop.unlock(); + Ok(()) + } + } + } + + fn current_device(&mut self) -> Result<&DeviceRef> { + if self.context.version_0_9_8 { + let mut dev: Box<ffi::cubeb_device> = Box::new(unsafe { mem::zeroed() }); + + if let Some(ref stm) = self.input_stream { + dev.input_name = match stm.get_device_name() { + Ok(name) => name.to_owned().into_raw(), + Err(_) => { + cubeb_log!("Error: couldn't get the input stream's device name"); + return Err(Error::error()); + } + } + } + + if let Some(ref stm) = self.output_stream { + dev.output_name = match stm.get_device_name() { + Ok(name) => name.to_owned().into_raw(), + Err(_) => { + cubeb_log!("Error: couldn't get the output stream's device name"); + return Err(Error::error()); + } + } + } + + Ok(unsafe { DeviceRef::from_ptr(Box::into_raw(dev) as *mut _) }) + } else { + cubeb_log!("Error: PulseAudio context too old"); + Err(not_supported()) + } + } + + fn set_input_mute(&mut self, _mute: bool) -> Result<()> { + Err(not_supported()) + } + + fn set_input_processing_params(&mut self, _params: InputProcessingParams) -> Result<()> { + Err(not_supported()) + } + + fn device_destroy(&mut self, device: &DeviceRef) -> Result<()> { + if device.as_ptr().is_null() { + cubeb_log!("Error: can't destroy null device"); + Err(Error::error()) + } else { + unsafe { + let _: Box<Device> = Box::from_raw(device.as_ptr() as *mut _); + } + Ok(()) + } + } + + fn register_device_changed_callback( + &mut self, + _: ffi::cubeb_device_changed_callback, + ) -> Result<()> { + cubeb_log!("Error: register_device_change_callback unimplemented"); + Err(Error::error()) + } +} + +impl<'ctx> PulseStream<'ctx> { + fn stream_init( + context: &pulse::Context, + stream_params: &StreamParamsRef, + stream_name: Option<&CStr>, + ) -> Result<pulse::Stream> { + if stream_params.prefs() == StreamPrefs::LOOPBACK { + cubeb_log!("Error: StreamPref::LOOPBACK unimplemented"); + return Err(not_supported()); + } + + fn to_pulse_format(format: SampleFormat) -> pulse::SampleFormat { + match format { + SampleFormat::S16LE => pulse::SampleFormat::Signed16LE, + SampleFormat::S16BE => pulse::SampleFormat::Signed16BE, + SampleFormat::Float32LE => pulse::SampleFormat::Float32LE, + SampleFormat::Float32BE => pulse::SampleFormat::Float32BE, + _ => pulse::SampleFormat::Invalid, + } + } + + let fmt = to_pulse_format(stream_params.format()); + if fmt == pulse::SampleFormat::Invalid { + cubeb_log!("Error: invalid sample format"); + return Err(invalid_format()); + } + + let ss = pulse::SampleSpec { + channels: stream_params.channels() as u8, + format: fmt.into(), + rate: stream_params.rate(), + }; + + let cm: Option<pa_channel_map> = match stream_params.layout() { + ChannelLayout::UNDEFINED => { + if stream_params.channels() <= 8 + && pulse::ChannelMap::init_auto( + stream_params.channels(), + PA_CHANNEL_MAP_DEFAULT, + ) + .is_none() + { + cubeb_log!("Layout undefined and PulseAudio's default layout has not been configured, guess one."); + Some(layout_to_channel_map(default_layout_for_channels( + stream_params.channels(), + ))) + } else { + cubeb_log!("Layout undefined, PulseAudio will use its default."); + None + } + } + _ => Some(layout_to_channel_map(stream_params.layout())), + }; + + let stream = pulse::Stream::new(context, stream_name.unwrap(), &ss, cm.as_ref()); + + match stream { + None => { + cubeb_log!("Error: pulse::Stream::new failure"); + Err(Error::error()) + } + Some(stm) => Ok(stm), + } + } + + pub fn cork_stream(&self, stream: Option<&pulse::Stream>, state: CorkState) { + if let Some(stm) = stream { + if let Ok(o) = stm.cork( + state.is_cork() as i32, + stream_success, + self as *const _ as *mut _, + ) { + self.context.operation_wait(stream, &o); + } + } + } + + fn cork(&mut self, state: CorkState) { + { + self.context.mainloop.lock(); + self.cork_stream(self.output_stream.as_ref(), state); + self.cork_stream(self.input_stream.as_ref(), state); + self.context.mainloop.unlock() + } + + if state.is_notify() { + self.state_change_callback(if state.is_cork() { + ffi::CUBEB_STATE_STOPPED + } else { + ffi::CUBEB_STATE_STARTED + }); + } + } + + fn update_timing_info(&self) -> bool { + let mut r = false; + + if let Some(ref stm) = self.output_stream { + if let Ok(o) = stm.update_timing_info(stream_success, self as *const _ as *mut _) { + r = self.context.operation_wait(stm, &o); + } + + if !r { + return r; + } + } + + if let Some(ref stm) = self.input_stream { + if let Ok(o) = stm.update_timing_info(stream_success, self as *const _ as *mut _) { + r = self.context.operation_wait(stm, &o); + } + } + + r + } + + pub fn state_change_callback(&mut self, s: ffi::cubeb_state) { + self.state = s; + unsafe { + (self.state_callback.unwrap())( + self as *mut PulseStream as *mut ffi::cubeb_stream, + self.user_ptr, + s, + ) + }; + } + + fn wait_until_ready(&self) -> bool { + fn wait_until_io_stream_ready( + stm: &pulse::Stream, + mainloop: &pulse::ThreadedMainloop, + ) -> bool { + if mainloop.is_null() { + return false; + } + + loop { + let state = stm.get_state(); + if !state.is_good() { + return false; + } + if state == pulse::StreamState::Ready { + break; + } + mainloop.wait(); + } + + true + } + + if let Some(ref stm) = self.output_stream { + if !wait_until_io_stream_ready(stm, &self.context.mainloop) { + return false; + } + } + + if let Some(ref stm) = self.input_stream { + if !wait_until_io_stream_ready(stm, &self.context.mainloop) { + return false; + } + } + + true + } + + #[cfg_attr(feature = "cargo-clippy", allow(clippy::cognitive_complexity))] + fn trigger_user_callback(&mut self, input_data: *const c_void, nbytes: usize) { + fn drained_cb( + a: &pulse::MainloopApi, + e: *mut pa_time_event, + _tv: &pulse::TimeVal, + u: *mut c_void, + ) { + cubeb_logv!("Drain finished callback."); + let stm = unsafe { &mut *(u as *mut PulseStream) }; + let drain_timer = stm.drain_timer.load(Ordering::Acquire); + debug_assert_eq!(drain_timer, e); + stm.state_change_callback(ffi::CUBEB_STATE_DRAINED); + /* there's no pa_rttime_free, so use this instead. */ + a.time_free(drain_timer); + stm.drain_timer.store(ptr::null_mut(), Ordering::Release); + stm.context.mainloop.signal(); + } + + if let Some(ref stm) = self.output_stream { + let frame_size = self.output_sample_spec.frame_size(); + debug_assert_eq!(nbytes % frame_size, 0); + + let mut towrite = nbytes; + let mut read_offset = 0usize; + while towrite > 0 { + match stm.begin_write(towrite) { + Err(e) => { + cubeb_logv!("Error: failure to write data"); + panic!("Failed to write data: {}", e); + } + Ok((buffer, size)) => { + debug_assert!(size > 0); + debug_assert_eq!(size % frame_size, 0); + + cubeb_logv!( + "Trigger user callback with output buffer size={}, read_offset={}", + size, + read_offset + ); + let read_ptr = unsafe { (input_data as *const u8).add(read_offset) }; + #[cfg_attr(feature = "cargo-clippy", allow(clippy::unnecessary_cast))] + let mut got = unsafe { + self.data_callback.unwrap()( + self as *const _ as *mut _, + self.user_ptr, + read_ptr as *const _ as *mut _, + buffer, + (size / frame_size) as c_long, + ) as i64 + }; + if got < 0 { + let _ = stm.cancel_write(); + self.shutdown = true; + unsafe { + self.state_callback.unwrap()( + self as *const _ as *mut _, + self.user_ptr, + ffi::CUBEB_STATE_ERROR, + ); + } + return; + } + + // If more iterations move offset of read buffer + if !input_data.is_null() { + let in_frame_size = self.input_sample_spec.frame_size(); + read_offset += (size / frame_size) * in_frame_size; + } + + if self.volume != PULSE_NO_GAIN { + let samples = (self.output_sample_spec.channels as usize * size + / frame_size) as isize; + + if self.output_sample_spec.format == PA_SAMPLE_S16BE + || self.output_sample_spec.format == PA_SAMPLE_S16LE + { + let b = buffer as *mut i16; + for i in 0..samples { + unsafe { *b.offset(i) *= self.volume as i16 }; + } + } else { + let b = buffer as *mut f32; + for i in 0..samples { + unsafe { *b.offset(i) *= self.volume }; + } + } + } + + let should_drain = (got as usize) < size / frame_size; + + if should_drain && self.output_frame_count.load(Ordering::SeqCst) == 0 { + // Draining during preroll, ensure `prebuf` frames are written so + // the stream starts. If not, pad with a bit of silence. + let prebuf_size_bytes = stm.get_buffer_attr().prebuf as usize; + let got_bytes = got as usize * frame_size; + if prebuf_size_bytes > got_bytes { + let padding_bytes = prebuf_size_bytes - got_bytes; + if padding_bytes + got_bytes <= size { + // A slice that starts after the data provided by the callback, + // with just enough room to provide a final buffer big enough. + let padding_buf: &mut [u8] = unsafe { + slice::from_raw_parts_mut::<u8>( + buffer.add(got_bytes) as *mut u8, + padding_bytes, + ) + }; + padding_buf.fill(0); + got += (padding_bytes / frame_size) as i64; + } + } else { + cubeb_logv!( + "Not enough room to pad up to prebuf when prebuffering." + ) + } + } + + let r = stm.write( + buffer, + got as usize * frame_size, + 0, + pulse::SeekMode::Relative, + ); + + if should_drain { + cubeb_logv!("Draining {} < {}", got, size / frame_size); + let latency = match stm.get_latency() { + Ok(StreamLatency::Positive(l)) => l, + Ok(_) => { + panic!("Can not handle negative latency values."); + } + Err(e) => { + debug_assert_eq!( + e, + pulse::ErrorCode::from_error_code(PA_ERR_NODATA) + ); + /* this needs a better guess. */ + 100 * PA_USEC_PER_MSEC + } + }; + + /* pa_stream_drain is useless, see PA bug# 866. this is a workaround. */ + /* arbitrary safety margin: double the current latency. */ + debug_assert!(self.drain_timer.load(Ordering::Acquire).is_null()); + let stream_ptr = self as *const _ as *mut _; + if let Some(ref context) = self.context.context { + self.drain_timer.store( + context.rttime_new( + pulse::rtclock_now() + 2 * latency, + drained_cb, + stream_ptr, + ), + Ordering::Release, + ); + } + self.shutdown = true; + return; + } + + debug_assert!(r.is_ok()); + + towrite -= size; + } + } + } + debug_assert_eq!(towrite, 0); + } + } +} + +fn stream_success(_: &pulse::Stream, success: i32, u: *mut c_void) { + let stm = unsafe { &*(u as *mut PulseStream) }; + if success != 1 { + cubeb_log!("stream_success ignored failure: {}", success); + } + stm.context.mainloop.signal(); +} + +fn context_success(_: &pulse::Context, success: i32, u: *mut c_void) { + let ctx = unsafe { &*(u as *mut PulseContext) }; + if success != 1 { + cubeb_log!("context_success ignored failure: {}", success); + } + ctx.mainloop.signal(); +} + +fn invalid_format() -> Error { + Error::from_raw(ffi::CUBEB_ERROR_INVALID_FORMAT) +} + +fn not_supported() -> Error { + Error::from_raw(ffi::CUBEB_ERROR_NOT_SUPPORTED) +} + +#[cfg(all(test, not(feature = "pulse-dlopen")))] +mod test { + use super::layout_to_channel_map; + use cubeb_backend::ChannelLayout; + use pulse_ffi::*; + + macro_rules! channel_tests { + {$($name: ident, $layout: ident => [ $($channels: ident),* ]),+} => { + $( + #[test] + fn $name() { + let layout = ChannelLayout::$layout; + let mut iter = super::channel_layout_iter(layout); + $( + assert_eq!(Some(ChannelLayout::$channels), iter.next()); + )* + assert_eq!(None, iter.next()); + } + + )* + } + } + + channel_tests! { + channels_unknown, UNDEFINED => [ ], + channels_mono, MONO => [ + FRONT_CENTER + ], + channels_mono_lfe, MONO_LFE => [ + FRONT_CENTER, + LOW_FREQUENCY + ], + channels_stereo, STEREO => [ + FRONT_LEFT, + FRONT_RIGHT + ], + channels_stereo_lfe, STEREO_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + LOW_FREQUENCY + ], + channels_3f, _3F => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER + ], + channels_3f_lfe, _3F_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY + ], + channels_2f1, _2F1 => [ + FRONT_LEFT, + FRONT_RIGHT, + BACK_CENTER + ], + channels_2f1_lfe, _2F1_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + LOW_FREQUENCY, + BACK_CENTER + ], + channels_3f1, _3F1 => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + BACK_CENTER + ], + channels_3f1_lfe, _3F1_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY, + BACK_CENTER + ], + channels_2f2, _2F2 => [ + FRONT_LEFT, + FRONT_RIGHT, + SIDE_LEFT, + SIDE_RIGHT + ], + channels_2f2_lfe, _2F2_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + LOW_FREQUENCY, + SIDE_LEFT, + SIDE_RIGHT + ], + channels_quad, QUAD => [ + FRONT_LEFT, + FRONT_RIGHT, + BACK_LEFT, + BACK_RIGHT + ], + channels_quad_lfe, QUAD_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + LOW_FREQUENCY, + BACK_LEFT, + BACK_RIGHT + ], + channels_3f2, _3F2 => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + SIDE_LEFT, + SIDE_RIGHT + ], + channels_3f2_lfe, _3F2_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY, + SIDE_LEFT, + SIDE_RIGHT + ], + channels_3f2_back, _3F2_BACK => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + BACK_LEFT, + BACK_RIGHT + ], + channels_3f2_lfe_back, _3F2_LFE_BACK => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY, + BACK_LEFT, + BACK_RIGHT + ], + channels_3f3r_lfe, _3F3R_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY, + BACK_CENTER, + SIDE_LEFT, + SIDE_RIGHT + ], + channels_3f4_lfe, _3F4_LFE => [ + FRONT_LEFT, + FRONT_RIGHT, + FRONT_CENTER, + LOW_FREQUENCY, + BACK_LEFT, + BACK_RIGHT, + SIDE_LEFT, + SIDE_RIGHT + ] + } + + #[test] + fn mono_channels_enumerate() { + let layout = ChannelLayout::MONO; + let mut iter = super::channel_layout_iter(layout).enumerate(); + assert_eq!(Some((0, ChannelLayout::FRONT_CENTER)), iter.next()); + assert_eq!(None, iter.next()); + } + + #[test] + fn stereo_channels_enumerate() { + let layout = ChannelLayout::STEREO; + let mut iter = super::channel_layout_iter(layout).enumerate(); + assert_eq!(Some((0, ChannelLayout::FRONT_LEFT)), iter.next()); + assert_eq!(Some((1, ChannelLayout::FRONT_RIGHT)), iter.next()); + assert_eq!(None, iter.next()); + } + + #[test] + fn quad_channels_enumerate() { + let layout = ChannelLayout::QUAD; + let mut iter = super::channel_layout_iter(layout).enumerate(); + assert_eq!(Some((0, ChannelLayout::FRONT_LEFT)), iter.next()); + assert_eq!(Some((1, ChannelLayout::FRONT_RIGHT)), iter.next()); + assert_eq!(Some((2, ChannelLayout::BACK_LEFT)), iter.next()); + assert_eq!(Some((3, ChannelLayout::BACK_RIGHT)), iter.next()); + assert_eq!(None, iter.next()); + } + + macro_rules! map_channel_tests { + {$($name: ident, $layout: ident => [ $($channels: ident),* ]),+} => { + $( + #[test] + fn $name() { + let map = layout_to_channel_map(ChannelLayout::$layout); + assert_eq!(map.channels, map_channel_tests!(__COUNT__ $($channels)*)); + map_channel_tests!(__EACH__ map, 0, $($channels)*); + } + + )* + }; + (__COUNT__) => (0u8); + (__COUNT__ $x:ident $($xs: ident)*) => (1u8 + map_channel_tests!(__COUNT__ $($xs)*)); + (__EACH__ $map:expr, $i:expr, ) => {}; + (__EACH__ $map:expr, $i:expr, $x:ident $($xs: ident)*) => { + assert_eq!($map.map[$i], $x); + map_channel_tests!(__EACH__ $map, $i+1, $($xs)* ); + }; + } + + map_channel_tests! { + map_channel_mono, MONO => [ + PA_CHANNEL_POSITION_MONO + ], + map_channel_mono_lfe, MONO_LFE => [ + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE + ], + map_channel_stereo, STEREO => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT + ], + map_channel_stereo_lfe, STEREO_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_LFE + ], + map_channel_3f, _3F => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER + ], + map_channel_3f_lfe, _3F_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE + ], + map_channel_2f1, _2F1 => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_REAR_CENTER + ], + map_channel_2f1_lfe, _2F1_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_CENTER + ], + map_channel_3f1, _3F1 => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_REAR_CENTER + ], + map_channel_3f1_lfe, _3F1_LFE =>[ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_CENTER + ], + map_channel_2f2, _2F2 => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ], + map_channel_2f2_lfe, _2F2_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ], + map_channel_quad, QUAD => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_REAR_LEFT, + PA_CHANNEL_POSITION_REAR_RIGHT + ], + map_channel_quad_lfe, QUAD_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_LEFT, + PA_CHANNEL_POSITION_REAR_RIGHT + ], + map_channel_3f2, _3F2 => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ], + map_channel_3f2_lfe, _3F2_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ], + map_channel_3f2_back, _3F2_BACK => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_REAR_LEFT, + PA_CHANNEL_POSITION_REAR_RIGHT + ], + map_channel_3f2_lfe_back, _3F2_LFE_BACK => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_LEFT, + PA_CHANNEL_POSITION_REAR_RIGHT + ], + map_channel_3f3r_lfe, _3F3R_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_CENTER, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ], + map_channel_3f4_lfe, _3F4_LFE => [ + PA_CHANNEL_POSITION_FRONT_LEFT, + PA_CHANNEL_POSITION_FRONT_RIGHT, + PA_CHANNEL_POSITION_FRONT_CENTER, + PA_CHANNEL_POSITION_LFE, + PA_CHANNEL_POSITION_REAR_LEFT, + PA_CHANNEL_POSITION_REAR_RIGHT, + PA_CHANNEL_POSITION_SIDE_LEFT, + PA_CHANNEL_POSITION_SIDE_RIGHT + ] + } +} diff --git a/third_party/rust/cubeb-pulse/src/capi.rs b/third_party/rust/cubeb-pulse/src/capi.rs new file mode 100644 index 0000000000..49eaa7a1a1 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/capi.rs @@ -0,0 +1,21 @@ +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use backend::PulseContext; +use cubeb_backend::{capi, ffi}; +use std::os::raw::{c_char, c_int}; + +/// # Safety +/// +/// Entry point from C code. This function is unsafe because it dereferences +/// the given `c` and `context_name` pointers. The caller should ensure those +/// pointers are valid. +#[no_mangle] +pub unsafe extern "C" fn pulse_rust_init( + c: *mut *mut ffi::cubeb, + context_name: *const c_char, +) -> c_int { + capi::capi_init::<PulseContext>(c, context_name) +} diff --git a/third_party/rust/cubeb-pulse/src/lib.rs b/third_party/rust/cubeb-pulse/src/lib.rs new file mode 100644 index 0000000000..42ec20c077 --- /dev/null +++ b/third_party/rust/cubeb-pulse/src/lib.rs @@ -0,0 +1,18 @@ +//! Cubeb backend interface to Pulse Audio + +// Copyright © 2017-2018 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +#[macro_use] +extern crate cubeb_backend; +extern crate pulse; +extern crate pulse_ffi; +extern crate ringbuf; +extern crate semver; + +mod backend; +mod capi; + +pub use capi::pulse_rust_init; |