summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc2
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/audioipc2
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/audioipc2')
-rw-r--r--third_party/rust/audioipc2/.cargo-checksum.json1
-rw-r--r--third_party/rust/audioipc2/Cargo.toml82
-rw-r--r--third_party/rust/audioipc2/benches/serialization.rs93
-rw-r--r--third_party/rust/audioipc2/build.rs7
-rw-r--r--third_party/rust/audioipc2/src/codec.rs203
-rw-r--r--third_party/rust/audioipc2/src/errors.rs18
-rw-r--r--third_party/rust/audioipc2/src/ipccore.rs918
-rw-r--r--third_party/rust/audioipc2/src/lib.rs214
-rw-r--r--third_party/rust/audioipc2/src/messages.rs632
-rw-r--r--third_party/rust/audioipc2/src/rpccore.rs470
-rw-r--r--third_party/rust/audioipc2/src/shm.rs334
-rw-r--r--third_party/rust/audioipc2/src/sys/mod.rs77
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/cmsg.rs104
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/cmsghdr.c23
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/mod.rs126
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/msg.rs82
-rw-r--r--third_party/rust/audioipc2/src/sys/windows/mod.rs102
17 files changed, 3486 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2/.cargo-checksum.json b/third_party/rust/audioipc2/.cargo-checksum.json
new file mode 100644
index 0000000000..96d8f77888
--- /dev/null
+++ b/third_party/rust/audioipc2/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"Cargo.toml":"6d3b3004351e3313ef74472ab6b3f96d59a785b3d873698ca4caf67eb3d47aab","benches/serialization.rs":"d56855d868dab6aa22c8b03a61084535351b76c94b68d8b1d20764e352fe473f","build.rs":"65df9a97c6cdaa3faf72581f04ac289197b0b1797d69d22c1796e957ff1089e2","src/codec.rs":"38408b512d935cd7889a03b25dd14b36083ec4e6d2fcabd636182cf45e3d50bc","src/errors.rs":"67a4a994d0724397657581cde153bdfc05ce86e7efc467f23fafc8f64df80fa4","src/ipccore.rs":"ba339eebdc2d8a6d0cb9b294344809e63e404e220fda643b63a8a3ff63a755e8","src/lib.rs":"9b107cb52081eeea3fa742d30361db70f7138baa423dfe21d37dcf5087afc338","src/messages.rs":"452362da2cace9a0f2e3134c190ecb6a9997f8be4036cde06643e17c6c238240","src/rpccore.rs":"025b6614f1c42b96b0a8e74fd7881032d338c66e0d67ec0af70f910a9e30ebe1","src/shm.rs":"1d88f19606899e3e477865d6b84bbe3e272f51618a1c2d57b6dab03a4787cde3","src/sys/mod.rs":"e6fa1d260abf093e1f7b50185195e2d3aee0eb8c9774c6f253953b5896d838f3","src/sys/unix/cmsg.rs":"22349b3df39b51b9c414da363313c92d41b02a623753ffcca6f59613e8f79eb2","src/sys/unix/cmsghdr.c":"d7344b3dc15cdce410c68669b848bb81f7fe36362cd3699668cb613fa05180f8","src/sys/unix/mod.rs":"59835f0d5509940078b1820a54f49fc5514adeb3e45e7d21e3ab917431da2e74","src/sys/unix/msg.rs":"0e297d73bae9414184f85c2209cca0a3fde6d999a3f1d3f42faa3f56b6d57233","src/sys/windows/mod.rs":"3441a3212c6d44443a5975621d9594b0c841e5a7f113aa1b108a080330df2b77"},"package":null} \ No newline at end of file
diff --git a/third_party/rust/audioipc2/Cargo.toml b/third_party/rust/audioipc2/Cargo.toml
new file mode 100644
index 0000000000..98de8c77a6
--- /dev/null
+++ b/third_party/rust/audioipc2/Cargo.toml
@@ -0,0 +1,82 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+name = "audioipc2"
+version = "0.5.0"
+authors = [
+ "Matthew Gregan <kinetik@flim.org>",
+ "Dan Glastonbury <dan.glastonbury@gmail.com>",
+]
+description = "Remote Cubeb IPC"
+license = "ISC"
+
+[[bench]]
+name = "serialization"
+harness = false
+
+[dependencies]
+bincode = "1.3"
+byteorder = "1"
+bytes = "1"
+crossbeam-queue = "0.3"
+cubeb = "0.10"
+log = "0.4"
+scopeguard = "1.1.0"
+serde = "1"
+serde_bytes = "0.11"
+serde_derive = "1"
+slab = "0.4"
+
+[dependencies.error-chain]
+version = "0.12.0"
+default-features = false
+
+[dependencies.mio]
+version = "0.8"
+features = [
+ "os-poll",
+ "net",
+ "os-ext",
+]
+
+[dev-dependencies]
+env_logger = "0.9"
+
+[dev-dependencies.criterion]
+version = "0.3"
+features = ["html_reports"]
+
+[build-dependencies]
+cc = "1.0"
+
+[target."cfg(target_os = \"android\")".dependencies]
+ashmem = "0.1.2"
+
+[target."cfg(target_os = \"linux\")".dependencies.audio_thread_priority]
+version = "0.26.1"
+default-features = false
+
+[target."cfg(unix)".dependencies]
+arrayvec = "0.7"
+iovec = "0.1"
+libc = "0.2"
+memmap2 = "0.5"
+
+[target."cfg(windows)".dependencies.winapi]
+version = "0.3"
+features = [
+ "combaseapi",
+ "handleapi",
+ "memoryapi",
+ "objbase",
+]
diff --git a/third_party/rust/audioipc2/benches/serialization.rs b/third_party/rust/audioipc2/benches/serialization.rs
new file mode 100644
index 0000000000..39f770a939
--- /dev/null
+++ b/third_party/rust/audioipc2/benches/serialization.rs
@@ -0,0 +1,93 @@
+use audioipc::codec::{Codec, LengthDelimitedCodec};
+use audioipc::messages::DeviceInfo;
+use audioipc::ClientMessage;
+use audioipc2 as audioipc;
+use bytes::BytesMut;
+use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
+
+fn bench(c: &mut Criterion, name: &str, msg: impl Fn() -> ClientMessage) {
+ let mut codec: LengthDelimitedCodec<ClientMessage, ClientMessage> =
+ LengthDelimitedCodec::default();
+ let mut buf = BytesMut::with_capacity(8192);
+ c.bench_function(&format!("encode/{}", name), |b| {
+ b.iter_batched(
+ || msg(),
+ |msg| {
+ codec.encode(msg, &mut buf).unwrap();
+ buf.clear();
+ },
+ BatchSize::SmallInput,
+ )
+ });
+
+ let mut codec: LengthDelimitedCodec<ClientMessage, ClientMessage> =
+ LengthDelimitedCodec::default();
+ let mut buf = BytesMut::with_capacity(8192);
+ codec.encode(msg(), &mut buf).unwrap();
+ c.bench_function(&format!("decode/{}", name), |b| {
+ b.iter_batched_ref(
+ || buf.clone(),
+ |buf| {
+ codec.decode(buf).unwrap().unwrap();
+ },
+ BatchSize::SmallInput,
+ )
+ });
+
+ let mut codec: LengthDelimitedCodec<ClientMessage, ClientMessage> =
+ LengthDelimitedCodec::default();
+ let mut buf = BytesMut::with_capacity(8192);
+ c.bench_function(&format!("roundtrip/{}", name), |b| {
+ b.iter_batched(
+ || msg(),
+ |msg| {
+ codec.encode(msg, &mut buf).unwrap();
+ codec.decode(&mut buf).unwrap().unwrap();
+ },
+ BatchSize::SmallInput,
+ )
+ });
+}
+
+pub fn criterion_benchmark(c: &mut Criterion) {
+ bench(c, "tiny", || ClientMessage::ClientConnected);
+ bench(c, "small", || ClientMessage::StreamPosition(0));
+ bench(c, "medium", || {
+ ClientMessage::ContextEnumeratedDevices(make_device_vec(2))
+ });
+ bench(c, "large", || {
+ ClientMessage::ContextEnumeratedDevices(make_device_vec(20))
+ });
+ bench(c, "huge", || {
+ ClientMessage::ContextEnumeratedDevices(make_device_vec(128))
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
+
+fn make_device_vec(n: usize) -> Vec<DeviceInfo> {
+ let mut devices = Vec::with_capacity(n);
+ for i in 0..n {
+ let device = DeviceInfo {
+ devid: i,
+ device_id: Some(vec![0u8; 64]),
+ friendly_name: Some(vec![0u8; 64]),
+ group_id: Some(vec![0u8; 64]),
+ vendor_name: Some(vec![0u8; 64]),
+ device_type: 0,
+ state: 0,
+ preferred: 0,
+ format: 0,
+ default_format: 0,
+ max_channels: 0,
+ default_rate: 0,
+ max_rate: 0,
+ min_rate: 0,
+ latency_lo: 0,
+ latency_hi: 0,
+ };
+ devices.push(device);
+ }
+ devices
+}
diff --git a/third_party/rust/audioipc2/build.rs b/third_party/rust/audioipc2/build.rs
new file mode 100644
index 0000000000..453dafad96
--- /dev/null
+++ b/third_party/rust/audioipc2/build.rs
@@ -0,0 +1,7 @@
+fn main() {
+ if std::env::var_os("CARGO_CFG_UNIX").is_some() {
+ cc::Build::new()
+ .file("src/sys/unix/cmsghdr.c")
+ .compile("cmsghdr");
+ }
+}
diff --git a/third_party/rust/audioipc2/src/codec.rs b/third_party/rust/audioipc2/src/codec.rs
new file mode 100644
index 0000000000..2c61faa6ea
--- /dev/null
+++ b/third_party/rust/audioipc2/src/codec.rs
@@ -0,0 +1,203 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers.
+
+// The assert in LengthDelimitedCodec::decode triggers this clippy warning but
+// requires upgrading the workspace to Rust 2021 to resolve.
+// This should be fixed in Rust 1.68, after which the following `allow` can be deleted.
+#![allow(clippy::uninlined_format_args)]
+
+use bincode::{self, Options};
+use byteorder::{ByteOrder, LittleEndian};
+use bytes::{Buf, BufMut, BytesMut};
+use serde::de::DeserializeOwned;
+use serde::ser::Serialize;
+use std::convert::TryInto;
+use std::fmt::Debug;
+use std::io;
+use std::marker::PhantomData;
+use std::mem::size_of;
+
+////////////////////////////////////////////////////////////////////////////////
+// Split buffer into size delimited frames - This appears more complicated than
+// might be necessary due to handling the possibility of messages being split
+// across reads.
+
+pub trait Codec {
+ /// The type of items to be encoded into byte buffer
+ type In;
+
+ /// The type of items to be returned by decoding from byte buffer
+ type Out;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>;
+
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the I/O.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
+ match self.decode(buf)? {
+ Some(frame) => Ok(frame),
+ None => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "bytes remaining on stream",
+ )),
+ }
+ }
+
+ /// Encodes a frame into the buffer provided.
+ fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>;
+}
+
+/// Codec based upon bincode serialization
+///
+/// Messages that have been serialized using bincode are prefixed with
+/// the length of the message to aid in deserialization, so that it's
+/// known if enough data has been received to decode a complete
+/// message.
+pub struct LengthDelimitedCodec<In, Out> {
+ state: State,
+ encode_buf: Vec<u8>,
+ __in: PhantomData<In>,
+ __out: PhantomData<Out>,
+}
+
+enum State {
+ Length,
+ Data(u32),
+}
+
+const MAX_MESSAGE_LEN: u32 = 1024 * 1024;
+const MAGIC: u64 = 0xa4d1_019c_c910_1d4a;
+const HEADER_LEN: usize = size_of::<u32>() + size_of::<u64>();
+
+impl<In, Out> Default for LengthDelimitedCodec<In, Out> {
+ fn default() -> Self {
+ Self {
+ state: State::Length,
+ encode_buf: Vec::with_capacity(crate::ipccore::IPC_CLIENT_BUFFER_SIZE),
+ __in: PhantomData,
+ __out: PhantomData,
+ }
+ }
+}
+
+impl<In, Out> LengthDelimitedCodec<In, Out> {
+ // Lengths are encoded as little endian u32
+ fn decode_length(buf: &mut BytesMut) -> Option<u32> {
+ if buf.len() < HEADER_LEN {
+ // Not enough data
+ return None;
+ }
+
+ let magic = LittleEndian::read_u64(&buf[0..8]);
+ assert_eq!(magic, MAGIC);
+
+ // Consume the length field
+ let n = LittleEndian::read_u32(&buf[8..12]);
+ buf.advance(HEADER_LEN);
+ Some(n)
+ }
+
+ fn decode_data(buf: &mut BytesMut, n: u32) -> io::Result<Option<Out>>
+ where
+ Out: DeserializeOwned + Debug,
+ {
+ let n = n.try_into().unwrap();
+
+ // At this point, the buffer has already had the required capacity
+ // reserved. All there is to do is read.
+ if buf.len() < n {
+ return Ok(None);
+ }
+
+ trace!("Attempting to decode");
+ let msg = bincode::options()
+ .with_limit(MAX_MESSAGE_LEN as u64)
+ .deserialize::<Out>(&buf[..n])
+ .map_err(|e| match *e {
+ bincode::ErrorKind::Io(e) => e,
+ _ => io::Error::new(io::ErrorKind::Other, *e),
+ })?;
+ buf.advance(n);
+
+ trace!("... Decoded {:?}", msg);
+ Ok(Some(msg))
+ }
+}
+
+impl<In, Out> Codec for LengthDelimitedCodec<In, Out>
+where
+ In: Serialize + Debug,
+ Out: DeserializeOwned + Debug,
+{
+ type In = In;
+ type Out = Out;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
+ let n = match self.state {
+ State::Length => {
+ match Self::decode_length(buf) {
+ Some(n) => {
+ assert!(
+ n <= MAX_MESSAGE_LEN,
+ "assertion failed: {} <= {}",
+ n,
+ MAX_MESSAGE_LEN
+ );
+ self.state = State::Data(n);
+
+ // Ensure that the buffer has enough space to read the
+ // incoming payload
+ buf.reserve(n.try_into().unwrap());
+
+ n
+ }
+ None => return Ok(None),
+ }
+ }
+ State::Data(n) => n,
+ };
+
+ match Self::decode_data(buf, n)? {
+ Some(data) => {
+ // Update the decode state
+ self.state = State::Length;
+
+ // Make sure the buffer has enough space to read the next length header.
+ buf.reserve(HEADER_LEN);
+
+ Ok(Some(data))
+ }
+ None => Ok(None),
+ }
+ }
+
+ fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> {
+ trace!("Attempting to encode");
+
+ self.encode_buf.clear();
+ if let Err(e) = bincode::options()
+ .with_limit(MAX_MESSAGE_LEN as u64)
+ .serialize_into::<_, Self::In>(&mut self.encode_buf, &item)
+ {
+ trace!("message encode failed: {:?}", *e);
+ match *e {
+ bincode::ErrorKind::Io(e) => return Err(e),
+ _ => return Err(io::Error::new(io::ErrorKind::Other, *e)),
+ }
+ }
+
+ let encoded_len = self.encode_buf.len();
+ assert!(encoded_len <= MAX_MESSAGE_LEN as usize);
+ buf.reserve(encoded_len + HEADER_LEN);
+ buf.put_u64_le(MAGIC);
+ buf.put_u32_le(encoded_len.try_into().unwrap());
+ buf.extend_from_slice(&self.encode_buf);
+
+ Ok(())
+ }
+}
diff --git a/third_party/rust/audioipc2/src/errors.rs b/third_party/rust/audioipc2/src/errors.rs
new file mode 100644
index 0000000000..f4f3e32f5f
--- /dev/null
+++ b/third_party/rust/audioipc2/src/errors.rs
@@ -0,0 +1,18 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+error_chain! {
+ // Maybe replace with chain_err to improve the error info.
+ foreign_links {
+ Bincode(bincode::Error);
+ Io(std::io::Error);
+ Cubeb(cubeb::Error);
+ }
+
+ // Replace bail!(str) with explicit errors.
+ errors {
+ Disconnected
+ }
+}
diff --git a/third_party/rust/audioipc2/src/ipccore.rs b/third_party/rust/audioipc2/src/ipccore.rs
new file mode 100644
index 0000000000..16468b9015
--- /dev/null
+++ b/third_party/rust/audioipc2/src/ipccore.rs
@@ -0,0 +1,918 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::io::{self, Result};
+use std::sync::{mpsc, Arc};
+use std::thread;
+
+use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker};
+use slab::Slab;
+
+use crate::messages::AssociateHandleForMessage;
+use crate::rpccore::{
+ make_client, make_server, Client, Handler, Proxy, RequestQueue, RequestQueueSender, Server,
+};
+use crate::{
+ codec::Codec,
+ codec::LengthDelimitedCodec,
+ sys::{self, RecvMsg, SendMsg},
+};
+
+use serde::{de::DeserializeOwned, Serialize};
+use std::fmt::Debug;
+
+const WAKE_TOKEN: Token = Token(!0);
+
+thread_local!(static IN_EVENTLOOP: std::cell::RefCell<Option<thread::ThreadId>> = std::cell::RefCell::new(None));
+
+fn assert_not_in_event_loop_thread() {
+ IN_EVENTLOOP.with(|b| {
+ assert_ne!(*b.borrow(), Some(thread::current().id()));
+ });
+}
+
+// Requests sent by an EventLoopHandle to be serviced by
+// the handle's associated EventLoop.
+enum Request {
+ // See EventLoop::add_connection
+ AddConnection(
+ sys::Pipe,
+ Box<dyn Driver + Send>,
+ mpsc::Sender<Result<Token>>,
+ ),
+ // See EventLoop::shutdown
+ Shutdown,
+ // See EventLoop::wake_connection
+ WakeConnection(Token),
+}
+
+// EventLoopHandle is a cloneable external reference
+// to a running EventLoop, allowing registration of
+// new client and server connections, in addition to
+// requesting the EventLoop shut down cleanly.
+#[derive(Clone, Debug)]
+pub struct EventLoopHandle {
+ waker: Arc<Waker>,
+ requests: RequestQueueSender<Request>,
+}
+
+impl EventLoopHandle {
+ pub fn bind_client<C: Client + 'static>(
+ &self,
+ connection: sys::Pipe,
+ ) -> Result<Proxy<<C as Client>::ServerMessage, <C as Client>::ClientMessage>>
+ where
+ <C as Client>::ServerMessage: Serialize + Debug + AssociateHandleForMessage + Send,
+ <C as Client>::ClientMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
+ {
+ let (handler, mut proxy) = make_client::<C>()?;
+ let driver = Box::new(FramedDriver::new(handler));
+ let r = self.add_connection(connection, driver);
+ trace!("EventLoop::bind_client {:?}", r);
+ r.map(|token| {
+ proxy.connect_event_loop(self.clone(), token);
+ proxy
+ })
+ }
+
+ pub fn bind_server<S: Server + Send + 'static>(
+ &self,
+ server: S,
+ connection: sys::Pipe,
+ ) -> Result<()>
+ where
+ <S as Server>::ServerMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send,
+ <S as Server>::ClientMessage: Serialize + Debug + AssociateHandleForMessage + Send,
+ {
+ let handler = make_server::<S>(server);
+ let driver = Box::new(FramedDriver::new(handler));
+ let r = self.add_connection(connection, driver);
+ trace!("EventLoop::bind_server {:?}", r);
+ r.map(|_| ())
+ }
+
+ // Register a new connection with associated driver on the EventLoop.
+ // TODO: Since this is called from a Gecko main thread, make this non-blocking wrt. the EventLoop.
+ fn add_connection(
+ &self,
+ connection: sys::Pipe,
+ driver: Box<dyn Driver + Send>,
+ ) -> Result<Token> {
+ assert_not_in_event_loop_thread();
+ let (tx, rx) = mpsc::channel();
+ self.requests
+ .push(Request::AddConnection(connection, driver, tx))
+ .map_err(|_| {
+ debug!("EventLoopHandle::add_connection send failed");
+ io::ErrorKind::ConnectionAborted
+ })?;
+ self.waker.wake()?;
+ rx.recv().map_err(|_| {
+ debug!("EventLoopHandle::add_connection recv failed");
+ io::ErrorKind::ConnectionAborted
+ })?
+ }
+
+ // Signal EventLoop to shutdown. Causes EventLoop::poll to return Ok(false).
+ fn shutdown(&self) -> Result<()> {
+ self.requests.push(Request::Shutdown).map_err(|_| {
+ debug!("EventLoopHandle::shutdown send failed");
+ io::ErrorKind::ConnectionAborted
+ })?;
+ self.waker.wake()
+ }
+
+ // Signal EventLoop to wake connection specified by `token` for processing.
+ pub(crate) fn wake_connection(&self, token: Token) {
+ if self.requests.push(Request::WakeConnection(token)).is_ok() {
+ self.waker.wake().expect("wake failed");
+ }
+ }
+}
+
+// EventLoop owns all registered connections, and is responsible for calling each connection's
+// `handle_event` or `handle_wake` any time a readiness or wake event associated with that connection is
+// produced.
+struct EventLoop {
+ poll: Poll,
+ events: Events,
+ waker: Arc<Waker>,
+ name: String,
+ connections: Slab<Connection>,
+ requests: Arc<RequestQueue<Request>>,
+}
+
+const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow.
+const EVENT_LOOP_EVENTS_PER_ITERATION: usize = 256; // Number of events per poll() step, arbitrary limit.
+
+impl EventLoop {
+ fn new(name: String) -> Result<EventLoop> {
+ let poll = Poll::new()?;
+ let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?);
+ let eventloop = EventLoop {
+ poll,
+ events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION),
+ waker,
+ name,
+ connections: Slab::with_capacity(EVENT_LOOP_INITIAL_CLIENTS),
+ requests: Arc::new(RequestQueue::new(EVENT_LOOP_INITIAL_CLIENTS)),
+ };
+
+ Ok(eventloop)
+ }
+
+ // Return a cloneable handle for controlling the EventLoop externally.
+ fn handle(&mut self) -> EventLoopHandle {
+ EventLoopHandle {
+ waker: self.waker.clone(),
+ requests: self.requests.new_sender(),
+ }
+ }
+
+ // Register a connection and driver.
+ fn add_connection(
+ &mut self,
+ connection: sys::Pipe,
+ driver: Box<dyn Driver + Send>,
+ ) -> Result<Token> {
+ if self.connections.len() == self.connections.capacity() {
+ trace!("{}: connection slab full, insert will allocate", self.name);
+ }
+ let entry = self.connections.vacant_entry();
+ let token = Token(entry.key());
+ assert_ne!(token, WAKE_TOKEN);
+ let connection = Connection::new(connection, token, driver, self.poll.registry())?;
+ debug!("{}: {:?}: new connection", self.name, token);
+ entry.insert(connection);
+ Ok(token)
+ }
+
+ // Step EventLoop once. Call this in a loop from a dedicated thread.
+ // Returns false if EventLoop is shutting down.
+ // Each step may call `handle_event` on any registered connection that
+ // has received readiness events from the poll wakeup.
+ fn poll(&mut self) -> Result<bool> {
+ loop {
+ let r = self.poll.poll(&mut self.events, None);
+ match r {
+ Ok(()) => break,
+ Err(ref e) if interrupted(e) => continue,
+ Err(e) => return Err(e),
+ }
+ }
+
+ for event in self.events.iter() {
+ match event.token() {
+ WAKE_TOKEN => {
+ debug!("{}: WAKE: wake event, will process requests", self.name);
+ }
+ token => {
+ debug!("{}: {:?}: connection event: {:?}", self.name, token, event);
+ let done = if let Some(connection) = self.connections.get_mut(token.0) {
+ match connection.handle_event(event, self.poll.registry()) {
+ Ok(done) => {
+ debug!("{}: connection {:?} done={}", self.name, token, done);
+ done
+ }
+ Err(e) => {
+ debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
+ true
+ }
+ }
+ } else {
+ // Spurious event, log and ignore.
+ debug!(
+ "{}: {:?}: token not found in slab: {:?}",
+ self.name, token, event
+ );
+ false
+ };
+ if done {
+ debug!("{}: {:?}: done, removing", self.name, token);
+ let mut connection = self.connections.remove(token.0);
+ if let Err(e) = connection.shutdown(self.poll.registry()) {
+ debug!(
+ "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
+ self.name, token, e
+ );
+ }
+ }
+ }
+ }
+ }
+
+ // If the waker was signalled there may be pending requests to process.
+ while let Some(req) = self.requests.pop() {
+ match req {
+ Request::AddConnection(pipe, driver, tx) => {
+ debug!("{}: EventLoop: handling add_connection", self.name);
+ let r = self.add_connection(pipe, driver);
+ tx.send(r).expect("EventLoop::add_connection");
+ }
+ Request::Shutdown => {
+ debug!("{}: EventLoop: handling shutdown", self.name);
+ return Ok(false);
+ }
+ Request::WakeConnection(token) => {
+ debug!(
+ "{}: EventLoop: handling wake_connection {:?}",
+ self.name, token
+ );
+ let done = if let Some(connection) = self.connections.get_mut(token.0) {
+ match connection.handle_wake(self.poll.registry()) {
+ Ok(done) => done,
+ Err(e) => {
+ debug!("{}: {:?}: connection error: {:?}", self.name, token, e);
+ true
+ }
+ }
+ } else {
+ // Spurious wake, log and ignore.
+ debug!(
+ "{}: {:?}: token not found in slab: wake_connection",
+ self.name, token
+ );
+ false
+ };
+ if done {
+ debug!("{}: {:?}: done (wake), removing", self.name, token);
+ let mut connection = self.connections.remove(token.0);
+ if let Err(e) = connection.shutdown(self.poll.registry()) {
+ debug!(
+ "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
+ self.name, token, e
+ );
+ }
+ }
+ }
+ }
+ }
+
+ Ok(true)
+ }
+}
+
+impl Drop for EventLoop {
+ fn drop(&mut self) {
+ debug!("{}: EventLoop drop", self.name);
+ for (token, connection) in &mut self.connections {
+ debug!(
+ "{}: EventLoop drop - closing connection for {:?}",
+ self.name, token
+ );
+ if let Err(e) = connection.shutdown(self.poll.registry()) {
+ debug!(
+ "{}: EventLoop drop - closing connection for {:?} failed: {:?}",
+ self.name, token, e
+ );
+ }
+ }
+ debug!("{}: EventLoop drop done", self.name);
+ }
+}
+
+// Connection wraps an interprocess connection (Pipe) and manages
+// receiving inbound and sending outbound buffers (and associated handles, if any).
+// The associated driver is responsible for message framing and serialization.
+struct Connection {
+ io: sys::Pipe,
+ token: Token,
+ interest: Option<Interest>,
+ inbound: sys::ConnectionBuffer,
+ outbound: sys::ConnectionBuffer,
+ driver: Box<dyn Driver + Send>,
+}
+
+pub(crate) const IPC_CLIENT_BUFFER_SIZE: usize = 16384;
+
+impl Connection {
+ fn new(
+ mut io: sys::Pipe,
+ token: Token,
+ driver: Box<dyn Driver + Send>,
+ registry: &Registry,
+ ) -> Result<Connection> {
+ let interest = Interest::READABLE;
+ registry.register(&mut io, token, interest)?;
+ Ok(Connection {
+ io,
+ token,
+ interest: Some(interest),
+ inbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
+ outbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE),
+ driver,
+ })
+ }
+
+ fn shutdown(&mut self, registry: &Registry) -> Result<()> {
+ trace!(
+ "{:?}: connection shutdown interest={:?}",
+ self.token,
+ self.interest
+ );
+ let r = self.io.shutdown();
+ trace!("{:?}: connection shutdown r={:?}", self.token, r);
+ self.interest = None;
+ registry.deregister(&mut self.io)
+ }
+
+ // Connections are always interested in READABLE. clear_readable is only
+ // called when the connection is in the process of shutting down.
+ fn clear_readable(&mut self, registry: &Registry) -> Result<()> {
+ self.update_registration(
+ registry,
+ self.interest.and_then(|i| i.remove(Interest::READABLE)),
+ )
+ }
+
+ // Connections toggle WRITABLE based on the state of the `outbound` buffer.
+ fn set_writable(&mut self, registry: &Registry) -> Result<()> {
+ self.update_registration(
+ registry,
+ Some(
+ self.interest
+ .map_or_else(|| Interest::WRITABLE, |i| i.add(Interest::WRITABLE)),
+ ),
+ )
+ }
+
+ fn clear_writable(&mut self, registry: &Registry) -> Result<()> {
+ self.update_registration(
+ registry,
+ self.interest.and_then(|i| i.remove(Interest::WRITABLE)),
+ )
+ }
+
+ // Update connection registration with the current readiness event interests.
+ fn update_registration(
+ &mut self,
+ registry: &Registry,
+ new_interest: Option<Interest>,
+ ) -> Result<()> {
+ // Note: Updating registration always triggers a writable event with NamedPipes, so
+ // it's important to skip updating registration when the set of interests hasn't changed.
+ if new_interest != self.interest {
+ trace!(
+ "{:?}: updating readiness registration old={:?} new={:?}",
+ self.token,
+ self.interest,
+ new_interest
+ );
+ self.interest = new_interest;
+ if let Some(interest) = self.interest {
+ registry.reregister(&mut self.io, self.token, interest)?;
+ } else {
+ registry.deregister(&mut self.io)?;
+ }
+ }
+ Ok(())
+ }
+
+ // Handle readiness event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
+ // The EventLoop will call this for any connection that has received an event.
+ fn handle_event(&mut self, event: &Event, registry: &Registry) -> Result<bool> {
+ debug!("{:?}: handling event {:?}", self.token, event);
+ assert_eq!(self.token, event.token());
+ let done = if event.is_readable() {
+ self.recv_inbound()?
+ } else {
+ trace!("{:?}: not readable", self.token);
+ false
+ };
+ self.flush_outbound()?;
+ if self.send_outbound(registry)? {
+ // Hit EOF during send
+ return Ok(true);
+ }
+ debug!(
+ "{:?}: handling event done (recv done={}, outbound={})",
+ self.token,
+ done,
+ self.outbound.is_empty()
+ );
+ let done = done && self.outbound.is_empty();
+ // If driver is done and outbound is clear, unregister connection.
+ if done {
+ trace!("{:?}: driver done, clearing read interest", self.token);
+ self.clear_readable(registry)?;
+ }
+ Ok(done)
+ }
+
+ // Handle wake event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list.
+ // The EventLoop will call this to clear the outbound buffer for any connection that has received a wake event.
+ fn handle_wake(&mut self, registry: &Registry) -> Result<bool> {
+ debug!("{:?}: handling wake", self.token);
+ self.flush_outbound()?;
+ if self.send_outbound(registry)? {
+ // Hit EOF during send
+ return Ok(true);
+ }
+ debug!("{:?}: handling wake done", self.token);
+ Ok(false)
+ }
+
+ fn recv_inbound(&mut self) -> Result<bool> {
+ // If the connection is readable, read into inbound and pass to driver for processing until all ready data
+ // has been consumed.
+ loop {
+ trace!("{:?}: pre-recv inbound: {:?}", self.token, self.inbound);
+ let r = self.io.recv_msg(&mut self.inbound);
+ match r {
+ Ok(0) => {
+ trace!(
+ "{:?}: recv EOF unprocessed inbound={}",
+ self.token,
+ self.inbound.is_empty()
+ );
+ return Ok(true);
+ }
+ Ok(n) => {
+ trace!("{:?}: recv bytes: {}, process_inbound", self.token, n);
+ let r = self.driver.process_inbound(&mut self.inbound);
+ trace!("{:?}: process_inbound done: {:?}", self.token, r);
+ match r {
+ Ok(done) => {
+ if done {
+ return Ok(true);
+ }
+ }
+ Err(e) => {
+ debug!(
+ "{:?}: process_inbound error: {:?} unprocessed inbound={}",
+ self.token,
+ e,
+ self.inbound.is_empty()
+ );
+ return Err(e);
+ }
+ }
+ }
+ Err(ref e) if would_block(e) => {
+ trace!("{:?}: recv would_block: {:?}", self.token, e);
+ return Ok(false);
+ }
+ Err(ref e) if interrupted(e) => {
+ trace!("{:?}: recv interrupted: {:?}", self.token, e);
+ continue;
+ }
+ Err(e) => {
+ debug!("{:?}: recv error: {:?}", self.token, e);
+ return Err(e);
+ }
+ }
+ }
+ }
+
+ fn flush_outbound(&mut self) -> Result<()> {
+ // Enqueue outbound messages to the outbound buffer, then try to write out to connection.
+ // There may be outbound messages even if there was no inbound processing, so always attempt
+ // to enqueue and flush.
+ trace!("{:?}: flush_outbound", self.token);
+ let r = self.driver.flush_outbound(&mut self.outbound);
+ trace!("{:?}: flush_outbound done: {:?}", self.token, r);
+ if let Err(e) = r {
+ debug!("{:?}: flush_outbound error: {:?}", self.token, e);
+ return Err(e);
+ }
+ Ok(())
+ }
+
+ fn send_outbound(&mut self, registry: &Registry) -> Result<bool> {
+ // Attempt to flush outbound buffer. If the connection's write buffer is full, register for WRITABLE
+ // and complete flushing when associated notitication arrives later.
+ while !self.outbound.is_empty() {
+ let r = self.io.send_msg(&mut self.outbound);
+ match r {
+ Ok(0) => {
+ trace!("{:?}: send EOF", self.token);
+ return Ok(true);
+ }
+ Ok(n) => {
+ trace!("{:?}: send bytes: {}", self.token, n);
+ }
+ Err(ref e) if would_block(e) => {
+ trace!(
+ "{:?}: send would_block: {:?}, setting write interest",
+ self.token,
+ e
+ );
+ // Register for write events.
+ self.set_writable(registry)?;
+ break;
+ }
+ Err(ref e) if interrupted(e) => {
+ trace!("{:?}: send interrupted: {:?}", self.token, e);
+ continue;
+ }
+ Err(e) => {
+ debug!("{:?}: send error: {:?}", self.token, e);
+ return Err(e);
+ }
+ }
+ trace!("{:?}: post-send: outbound {:?}", self.token, self.outbound);
+ }
+ // Outbound buffer flushed, clear registration for WRITABLE.
+ if self.outbound.is_empty() {
+ trace!("{:?}: outbound empty, clearing write interest", self.token);
+ self.clear_writable(registry)?;
+ }
+ Ok(false)
+ }
+}
+
+impl Drop for Connection {
+ fn drop(&mut self) {
+ debug!("{:?}: Connection drop", self.token);
+ }
+}
+
+fn would_block(err: &std::io::Error) -> bool {
+ err.kind() == std::io::ErrorKind::WouldBlock
+}
+
+fn interrupted(err: &std::io::Error) -> bool {
+ err.kind() == std::io::ErrorKind::Interrupted
+}
+
+// Driver only has a single implementation, but must be hidden behind a Trait object to
+// hide the varying FramedDriver sizes (due to different `T` values).
+trait Driver {
+ // Handle inbound messages. Returns true if Driver is done; this will trigger Connection removal and cleanup.
+ fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool>;
+
+ // Write outbound messages to `outbound`.
+ fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()>;
+}
+
+// Length-delimited connection framing and (de)serialization is handled by the inbound and outbound processing.
+// Handlers can then process message Requests and Responses without knowledge of serialization or
+// handle remoting.
+impl<T> Driver for FramedDriver<T>
+where
+ T: Handler,
+ T::In: DeserializeOwned + Debug + AssociateHandleForMessage,
+ T::Out: Serialize + Debug + AssociateHandleForMessage,
+{
+ // Caller passes `inbound` data, this function will trim any complete messages from `inbound` and pass them to the handler for processing.
+ fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool> {
+ debug!("process_inbound: {:?}", inbound);
+
+ // Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action.
+ #[allow(unused_mut)]
+ while let Some(mut item) = self.codec.decode(&mut inbound.buf)? {
+ if item.has_associated_handle() {
+ // On Unix, dequeue a handle from the connection and update the item's handle.
+ #[cfg(unix)]
+ {
+ let new = inbound
+ .pop_handle()
+ .expect("inbound handle expected for item");
+ unsafe { item.set_local_handle(new.take()) };
+ }
+ // On Windows, the deserialized item contains the correct handle value, so
+ // convert it to an owned handle on the item.
+ #[cfg(windows)]
+ {
+ assert!(inbound.pop_handle().is_none());
+ unsafe { item.set_local_handle() };
+ }
+ }
+
+ self.handler.consume(item)?;
+ }
+
+ Ok(false)
+ }
+
+ // Caller will try to write `outbound` to associated connection, queuing any data that can't be transmitted immediately.
+ fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()> {
+ debug!("flush_outbound: {:?}", outbound.buf);
+
+ // Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`.
+ while let Some(mut item) = self.handler.produce()? {
+ let handle = if item.has_associated_handle() {
+ #[allow(unused_mut)]
+ let mut handle = item.take_handle();
+ // On Windows, the handle is transferred by duplicating it into the target remote process.
+ #[cfg(windows)]
+ unsafe {
+ item.set_remote_handle(handle.send_to_target()?);
+ }
+ Some(handle)
+ } else {
+ None
+ };
+
+ self.codec.encode(item, &mut outbound.buf)?;
+ if let Some(handle) = handle {
+ // `outbound` retains ownership of the handle until the associated
+ // encoded item in `outbound.buf` is sent to the remote process.
+ outbound.push_handle(handle);
+ }
+ }
+ Ok(())
+ }
+}
+
+struct FramedDriver<T: Handler> {
+ codec: LengthDelimitedCodec<T::Out, T::In>,
+ handler: T,
+}
+
+impl<T: Handler> FramedDriver<T> {
+ fn new(handler: T) -> FramedDriver<T> {
+ FramedDriver {
+ codec: Default::default(),
+ handler,
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct EventLoopThread {
+ thread: Option<thread::JoinHandle<Result<()>>>,
+ name: String,
+ handle: EventLoopHandle,
+}
+
+impl EventLoopThread {
+ pub fn new<F1, F2>(
+ name: String,
+ stack_size: Option<usize>,
+ after_start: F1,
+ before_stop: F2,
+ ) -> Result<Self>
+ where
+ F1: Fn() + Send + 'static,
+ F2: Fn() + Send + 'static,
+ {
+ let mut event_loop = EventLoop::new(name.clone())?;
+ let handle = event_loop.handle();
+
+ let builder = thread::Builder::new()
+ .name(name.clone())
+ .stack_size(stack_size.unwrap_or(64 * 4096));
+
+ let thread = builder.spawn(move || {
+ trace!("{}: event loop thread enter", event_loop.name);
+ after_start();
+ let _thread_exit_guard = scopeguard::guard((), |_| before_stop());
+
+ let r = loop {
+ let start = std::time::Instant::now();
+ let r = event_loop.poll();
+ trace!(
+ "{}: event loop poll r={:?}, took={}μs",
+ event_loop.name,
+ r,
+ start.elapsed().as_micros()
+ );
+ match r {
+ Ok(true) => continue,
+ _ => break r,
+ }
+ };
+
+ trace!("{}: event loop thread exit", event_loop.name);
+ r.map(|_| ())
+ })?;
+
+ Ok(EventLoopThread {
+ thread: Some(thread),
+ name,
+ handle,
+ })
+ }
+
+ pub fn handle(&self) -> &EventLoopHandle {
+ &self.handle
+ }
+}
+
+impl Drop for EventLoopThread {
+ // Shut down event loop and executor thread. Blocks until complete.
+ fn drop(&mut self) {
+ trace!("{}: EventLoopThread shutdown", self.name);
+ if let Err(e) = self.handle.shutdown() {
+ debug!("{}: initiating shutdown failed: {:?}", self.name, e);
+ }
+ let thread = self.thread.take().expect("event loop thread");
+ if let Err(e) = thread.join() {
+ error!("{}: EventLoopThread failed: {:?}", self.name, e);
+ }
+ trace!("{}: EventLoopThread shutdown done", self.name);
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use serde_derive::{Deserialize, Serialize};
+
+ use super::*;
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq)]
+ enum TestServerMessage {
+ TestRequest,
+ }
+ impl AssociateHandleForMessage for TestServerMessage {}
+
+ struct TestServerImpl {}
+
+ impl Server for TestServerImpl {
+ type ServerMessage = TestServerMessage;
+ type ClientMessage = TestClientMessage;
+
+ fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage {
+ assert_eq!(req, TestServerMessage::TestRequest);
+ TestClientMessage::TestResponse
+ }
+ }
+
+ #[derive(Debug, Serialize, Deserialize, PartialEq)]
+ enum TestClientMessage {
+ TestResponse,
+ }
+
+ impl AssociateHandleForMessage for TestClientMessage {}
+
+ struct TestClientImpl {}
+
+ impl Client for TestClientImpl {
+ type ServerMessage = TestServerMessage;
+ type ClientMessage = TestClientMessage;
+ }
+
+ fn init() {
+ let _ = env_logger::builder().is_test(true).try_init();
+ }
+
+ fn setup() -> (
+ EventLoopThread,
+ EventLoopThread,
+ Proxy<TestServerMessage, TestClientMessage>,
+ ) {
+ // Server setup and registration.
+ let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
+ .expect("server EventLoopThread");
+ let server_handle = server.handle();
+
+ let (server_pipe, client_pipe) = sys::make_pipe_pair().expect("server make_pipe_pair");
+ server_handle
+ .bind_server(TestServerImpl {}, server_pipe)
+ .expect("server bind_server");
+
+ // Client setup and registration.
+ let client = EventLoopThread::new("test-client".to_string(), None, || {}, || {})
+ .expect("client EventLoopThread");
+ let client_handle = client.handle();
+
+ let client_pipe = unsafe { sys::Pipe::from_raw_handle(client_pipe) };
+ let client_proxy = client_handle
+ .bind_client::<TestClientImpl>(client_pipe)
+ .expect("client bind_client");
+
+ (server, client, client_proxy)
+ }
+
+ // Verify basic EventLoopThread functionality works. Create a server and client EventLoopThread, then send
+ // a single message from the client to the server and wait for the expected response.
+ #[test]
+ fn basic() {
+ init();
+ let (server, client, client_proxy) = setup();
+
+ // RPC message from client to server.
+ let response = client_proxy.call(TestServerMessage::TestRequest);
+ let response = response.expect("client response");
+ assert_eq!(response, TestClientMessage::TestResponse);
+
+ // Explicit shutdown.
+ drop(client);
+ drop(server);
+ }
+
+ // Same as `basic`, but shut down server before client.
+ #[test]
+ fn basic_reverse_drop_order() {
+ init();
+ let (server, client, client_proxy) = setup();
+
+ // RPC message from client to server.
+ let response = client_proxy.call(TestServerMessage::TestRequest);
+ let response = response.expect("client response");
+ assert_eq!(response, TestClientMessage::TestResponse);
+
+ // Explicit shutdown.
+ drop(server);
+ drop(client);
+ }
+
+ #[test]
+ fn dead_server() {
+ init();
+ let (server, _client, client_proxy) = setup();
+ drop(server);
+
+ let response = client_proxy.call(TestServerMessage::TestRequest);
+ response.expect_err("sending on closed channel");
+ }
+
+ #[test]
+ fn dead_client() {
+ init();
+ let (_server, client, client_proxy) = setup();
+ drop(client);
+
+ let response = client_proxy.call(TestServerMessage::TestRequest);
+ response.expect_err("sending on a closed channel");
+ }
+
+ #[test]
+ fn disconnected_handle() {
+ init();
+ let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {})
+ .expect("server EventLoopThread");
+ let server_handle = server.handle().clone();
+ drop(server);
+
+ server_handle
+ .shutdown()
+ .expect_err("sending on closed channel");
+ }
+
+ #[test]
+ fn clone_after_drop() {
+ init();
+ let (server, client, client_proxy) = setup();
+ drop(server);
+ drop(client);
+
+ let clone = client_proxy.clone();
+ let response = clone.call(TestServerMessage::TestRequest);
+ response.expect_err("sending to a dropped ClientHandler");
+ }
+
+ #[test]
+ fn basic_event_loop_thread_callbacks() {
+ init();
+ let (start_tx, start_rx) = mpsc::channel();
+ let (stop_tx, stop_rx) = mpsc::channel();
+
+ let elt = EventLoopThread::new(
+ "test-thread-callbacks".to_string(),
+ None,
+ move || start_tx.send(()).unwrap(),
+ move || stop_tx.send(()).unwrap(),
+ )
+ .expect("server EventLoopThread");
+
+ start_rx.recv().expect("after_start callback done");
+
+ drop(elt);
+
+ stop_rx.recv().expect("before_stop callback done");
+ }
+}
diff --git a/third_party/rust/audioipc2/src/lib.rs b/third_party/rust/audioipc2/src/lib.rs
new file mode 100644
index 0000000000..eb6d843ba4
--- /dev/null
+++ b/third_party/rust/audioipc2/src/lib.rs
@@ -0,0 +1,214 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+#![warn(unused_extern_crates)]
+#![recursion_limit = "1024"]
+#[macro_use]
+extern crate error_chain;
+#[macro_use]
+extern crate log;
+
+pub mod codec;
+#[allow(deprecated)]
+pub mod errors;
+pub mod messages;
+pub mod shm;
+
+pub mod ipccore;
+pub mod rpccore;
+pub mod sys;
+
+pub use crate::messages::{ClientMessage, ServerMessage};
+
+#[cfg(unix)]
+use std::os::unix::io::IntoRawFd;
+#[cfg(windows)]
+use std::os::windows::io::IntoRawHandle;
+
+use std::io::Result;
+
+// This must match the definition of
+// ipc::FileDescriptor::PlatformHandleType in Gecko.
+#[cfg(windows)]
+pub type PlatformHandleType = std::os::windows::raw::HANDLE;
+#[cfg(unix)]
+pub type PlatformHandleType = libc::c_int;
+
+// This stands in for RawFd/RawHandle.
+#[derive(Debug)]
+pub struct PlatformHandle(PlatformHandleType);
+
+#[cfg(unix)]
+pub const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType;
+
+#[cfg(windows)]
+pub const INVALID_HANDLE_VALUE: PlatformHandleType = winapi::um::handleapi::INVALID_HANDLE_VALUE;
+
+#[cfg(unix)]
+fn valid_handle(handle: PlatformHandleType) -> bool {
+ handle >= 0
+}
+
+#[cfg(windows)]
+fn valid_handle(handle: PlatformHandleType) -> bool {
+ handle != INVALID_HANDLE_VALUE && !handle.is_null()
+}
+
+impl PlatformHandle {
+ pub fn new(raw: PlatformHandleType) -> PlatformHandle {
+ assert!(valid_handle(raw));
+ PlatformHandle(raw)
+ }
+
+ #[cfg(windows)]
+ pub fn from<T: IntoRawHandle>(from: T) -> PlatformHandle {
+ PlatformHandle::new(from.into_raw_handle())
+ }
+
+ #[cfg(unix)]
+ pub fn from<T: IntoRawFd>(from: T) -> PlatformHandle {
+ PlatformHandle::new(from.into_raw_fd())
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn into_raw(self) -> PlatformHandleType {
+ let handle = self.0;
+ std::mem::forget(self);
+ handle
+ }
+
+ #[cfg(unix)]
+ pub fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> {
+ unsafe {
+ let newfd = libc::dup(h);
+ if !valid_handle(newfd) {
+ return Err(std::io::Error::last_os_error());
+ }
+ Ok(PlatformHandle::from(newfd))
+ }
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ #[cfg(windows)]
+ pub unsafe fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> {
+ let dup = duplicate_platform_handle(h, None)?;
+ Ok(PlatformHandle::new(dup))
+ }
+}
+
+impl Drop for PlatformHandle {
+ fn drop(&mut self) {
+ unsafe { close_platform_handle(self.0) }
+ }
+}
+
+#[cfg(unix)]
+unsafe fn close_platform_handle(handle: PlatformHandleType) {
+ libc::close(handle);
+}
+
+#[cfg(windows)]
+unsafe fn close_platform_handle(handle: PlatformHandleType) {
+ winapi::um::handleapi::CloseHandle(handle);
+}
+
+#[cfg(windows)]
+use winapi::shared::minwindef::{DWORD, FALSE};
+#[cfg(windows)]
+use winapi::um::{handleapi, processthreadsapi, winnt};
+
+// Duplicate `source_handle` to `target_pid`. Returns the value of the new handle inside the target process.
+// If `target_pid` is `None`, `source_handle` is duplicated in the current process.
+#[cfg(windows)]
+pub(crate) unsafe fn duplicate_platform_handle(
+ source_handle: PlatformHandleType,
+ target_pid: Option<DWORD>,
+) -> Result<PlatformHandleType> {
+ let source_process = processthreadsapi::GetCurrentProcess();
+ let target_process = if let Some(pid) = target_pid {
+ let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, pid);
+ if !valid_handle(target) {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "invalid target process",
+ ));
+ }
+ Some(target)
+ } else {
+ None
+ };
+
+ let mut target_handle = std::ptr::null_mut();
+ let ok = handleapi::DuplicateHandle(
+ source_process,
+ source_handle,
+ target_process.unwrap_or(source_process),
+ &mut target_handle,
+ 0,
+ FALSE,
+ winnt::DUPLICATE_SAME_ACCESS,
+ );
+ if let Some(target) = target_process {
+ handleapi::CloseHandle(target);
+ };
+ if ok == FALSE {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "DuplicateHandle failed",
+ ));
+ }
+ Ok(target_handle)
+}
+
+// Close `target_handle_to_close` inside target process `target_pid` using a
+// special invocation of `DuplicateHandle`. See
+// https://docs.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-duplicatehandle#:~:text=Normally%20the%20target,dwOptions%20to%20DUPLICATE_CLOSE_SOURCE.
+#[cfg(windows)]
+pub(crate) unsafe fn close_target_handle(
+ target_handle_to_close: PlatformHandleType,
+ target_pid: DWORD,
+) -> Result<()> {
+ let target_process =
+ processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid);
+ if !valid_handle(target_process) {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "invalid target process",
+ ));
+ }
+
+ let ok = handleapi::DuplicateHandle(
+ target_process,
+ target_handle_to_close,
+ std::ptr::null_mut(),
+ std::ptr::null_mut(),
+ 0,
+ FALSE,
+ winnt::DUPLICATE_CLOSE_SOURCE,
+ );
+ handleapi::CloseHandle(target_process);
+ if ok == FALSE {
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ "DuplicateHandle failed",
+ ));
+ }
+ Ok(())
+}
+
+#[cfg(windows)]
+pub fn server_platform_init() {
+ use winapi::shared::winerror;
+ use winapi::um::combaseapi;
+ use winapi::um::objbase;
+
+ unsafe {
+ let r = combaseapi::CoInitializeEx(std::ptr::null_mut(), objbase::COINIT_MULTITHREADED);
+ assert!(winerror::SUCCEEDED(r));
+ }
+}
+
+#[cfg(unix)]
+pub fn server_platform_init() {}
diff --git a/third_party/rust/audioipc2/src/messages.rs b/third_party/rust/audioipc2/src/messages.rs
new file mode 100644
index 0000000000..853f056bae
--- /dev/null
+++ b/third_party/rust/audioipc2/src/messages.rs
@@ -0,0 +1,632 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::PlatformHandle;
+use crate::PlatformHandleType;
+use crate::INVALID_HANDLE_VALUE;
+#[cfg(target_os = "linux")]
+use audio_thread_priority::RtPriorityThreadInfo;
+use cubeb::{self, ffi};
+use serde_derive::Deserialize;
+use serde_derive::Serialize;
+use std::ffi::{CStr, CString};
+use std::os::raw::{c_char, c_int, c_uint};
+use std::ptr;
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct Device {
+ #[serde(with = "serde_bytes")]
+ pub output_name: Option<Vec<u8>>,
+ #[serde(with = "serde_bytes")]
+ pub input_name: Option<Vec<u8>>,
+}
+
+impl<'a> From<&'a cubeb::DeviceRef> for Device {
+ fn from(info: &'a cubeb::DeviceRef) -> Self {
+ Self {
+ output_name: info.output_name_bytes().map(|s| s.to_vec()),
+ input_name: info.input_name_bytes().map(|s| s.to_vec()),
+ }
+ }
+}
+
+impl From<ffi::cubeb_device> for Device {
+ fn from(info: ffi::cubeb_device) -> Self {
+ Self {
+ output_name: dup_str(info.output_name),
+ input_name: dup_str(info.input_name),
+ }
+ }
+}
+
+impl From<Device> for ffi::cubeb_device {
+ fn from(info: Device) -> Self {
+ Self {
+ output_name: opt_str(info.output_name),
+ input_name: opt_str(info.input_name),
+ }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct DeviceInfo {
+ pub devid: usize,
+ #[serde(with = "serde_bytes")]
+ pub device_id: Option<Vec<u8>>,
+ #[serde(with = "serde_bytes")]
+ pub friendly_name: Option<Vec<u8>>,
+ #[serde(with = "serde_bytes")]
+ pub group_id: Option<Vec<u8>>,
+ #[serde(with = "serde_bytes")]
+ pub vendor_name: Option<Vec<u8>>,
+
+ pub device_type: ffi::cubeb_device_type,
+ pub state: ffi::cubeb_device_state,
+ pub preferred: ffi::cubeb_device_pref,
+
+ pub format: ffi::cubeb_device_fmt,
+ pub default_format: ffi::cubeb_device_fmt,
+ pub max_channels: u32,
+ pub default_rate: u32,
+ pub max_rate: u32,
+ pub min_rate: u32,
+
+ pub latency_lo: u32,
+ pub latency_hi: u32,
+}
+
+impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo {
+ fn from(info: &'a cubeb::DeviceInfoRef) -> Self {
+ let info = unsafe { &*info.as_ptr() };
+ DeviceInfo {
+ devid: info.devid as _,
+ device_id: dup_str(info.device_id),
+ friendly_name: dup_str(info.friendly_name),
+ group_id: dup_str(info.group_id),
+ vendor_name: dup_str(info.vendor_name),
+
+ device_type: info.device_type,
+ state: info.state,
+ preferred: info.preferred,
+
+ format: info.format,
+ default_format: info.default_format,
+ max_channels: info.max_channels,
+ default_rate: info.default_rate,
+ max_rate: info.max_rate,
+ min_rate: info.min_rate,
+
+ latency_lo: info.latency_lo,
+ latency_hi: info.latency_hi,
+ }
+ }
+}
+
+impl From<DeviceInfo> for ffi::cubeb_device_info {
+ fn from(info: DeviceInfo) -> Self {
+ ffi::cubeb_device_info {
+ devid: info.devid as _,
+ device_id: opt_str(info.device_id),
+ friendly_name: opt_str(info.friendly_name),
+ group_id: opt_str(info.group_id),
+ vendor_name: opt_str(info.vendor_name),
+
+ device_type: info.device_type,
+ state: info.state,
+ preferred: info.preferred,
+
+ format: info.format,
+ default_format: info.default_format,
+ max_channels: info.max_channels,
+ default_rate: info.default_rate,
+ max_rate: info.max_rate,
+ min_rate: info.min_rate,
+
+ latency_lo: info.latency_lo,
+ latency_hi: info.latency_hi,
+ }
+ }
+}
+
+#[repr(C)]
+#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
+pub struct StreamParams {
+ pub format: ffi::cubeb_sample_format,
+ pub rate: c_uint,
+ pub channels: c_uint,
+ pub layout: ffi::cubeb_channel_layout,
+ pub prefs: ffi::cubeb_stream_prefs,
+}
+
+impl From<&cubeb::StreamParamsRef> for StreamParams {
+ fn from(x: &cubeb::StreamParamsRef) -> StreamParams {
+ unsafe { *(x.as_ptr() as *mut StreamParams) }
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamCreateParams {
+ pub input_stream_params: Option<StreamParams>,
+ pub output_stream_params: Option<StreamParams>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamInitParams {
+ #[serde(with = "serde_bytes")]
+ pub stream_name: Option<Vec<u8>>,
+ pub input_device: usize,
+ pub input_stream_params: Option<StreamParams>,
+ pub output_device: usize,
+ pub output_stream_params: Option<StreamParams>,
+ pub latency_frames: u32,
+}
+
+fn dup_str(s: *const c_char) -> Option<Vec<u8>> {
+ if s.is_null() {
+ None
+ } else {
+ let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec();
+ Some(vec)
+ }
+}
+
+fn opt_str(v: Option<Vec<u8>>) -> *mut c_char {
+ match v {
+ Some(v) => match CString::new(v) {
+ Ok(s) => s.into_raw(),
+ Err(_) => {
+ debug!("Failed to convert bytes to CString");
+ ptr::null_mut()
+ }
+ },
+ None => ptr::null_mut(),
+ }
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct StreamCreate {
+ pub token: usize,
+ pub shm_handle: SerializableHandle,
+ pub shm_area_size: usize,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct RegisterDeviceCollectionChanged {
+ pub platform_handle: SerializableHandle,
+}
+
+// Client -> Server messages.
+// TODO: Callbacks should be different messages types so
+// ServerConn::process_msg doesn't have a catch-all case.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ServerMessage {
+ ClientConnect(u32),
+ ClientDisconnect,
+
+ ContextGetBackendId,
+ ContextGetMaxChannelCount,
+ ContextGetMinLatency(StreamParams),
+ ContextGetPreferredSampleRate,
+ ContextGetDeviceEnumeration(ffi::cubeb_device_type),
+ ContextSetupDeviceCollectionCallback,
+ ContextRegisterDeviceCollectionChanged(ffi::cubeb_device_type, bool),
+
+ StreamCreate(StreamCreateParams),
+ StreamInit(usize, StreamInitParams),
+ StreamDestroy(usize),
+
+ StreamStart(usize),
+ StreamStop(usize),
+ StreamGetPosition(usize),
+ StreamGetLatency(usize),
+ StreamGetInputLatency(usize),
+ StreamSetVolume(usize, f32),
+ StreamSetName(usize, CString),
+ StreamGetCurrentDevice(usize),
+ StreamRegisterDeviceChangeCallback(usize, bool),
+
+ #[cfg(target_os = "linux")]
+ PromoteThreadToRealTime([u8; std::mem::size_of::<RtPriorityThreadInfo>()]),
+}
+
+// Server -> Client messages.
+// TODO: Streams need id.
+#[derive(Debug, Serialize, Deserialize)]
+pub enum ClientMessage {
+ ClientConnected,
+ ClientDisconnected,
+
+ ContextBackendId(String),
+ ContextMaxChannelCount(u32),
+ ContextMinLatency(u32),
+ ContextPreferredSampleRate(u32),
+ ContextEnumeratedDevices(Vec<DeviceInfo>),
+ ContextSetupDeviceCollectionCallback(RegisterDeviceCollectionChanged),
+ ContextRegisteredDeviceCollectionChanged,
+
+ StreamCreated(StreamCreate),
+ StreamInitialized(SerializableHandle),
+ StreamDestroyed,
+
+ StreamStarted,
+ StreamStopped,
+ StreamPosition(u64),
+ StreamLatency(u32),
+ StreamInputLatency(u32),
+ StreamVolumeSet,
+ StreamNameSet,
+ StreamCurrentDevice(Device),
+ StreamRegisterDeviceChangeCallback,
+
+ #[cfg(target_os = "linux")]
+ ThreadPromoted,
+
+ Error(c_int),
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackReq {
+ Data {
+ nframes: isize,
+ input_frame_size: usize,
+ output_frame_size: usize,
+ },
+ State(ffi::cubeb_state),
+ DeviceChange,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum CallbackResp {
+ Data(isize),
+ State,
+ DeviceChange,
+ Error(c_int),
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum DeviceCollectionReq {
+ DeviceChange(ffi::cubeb_device_type),
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub enum DeviceCollectionResp {
+ DeviceChange,
+}
+
+// Represents a platform handle in various transitional states during serialization and remoting.
+// The process of serializing and remoting handles and the ownership during various states differs
+// between Windows and Unix. SerializableHandle changes during IPC as follows:
+//
+// 1. Created in the initial state `Owned`, with a valid `target_pid`.
+// 2. Ownership is transferred out for processing during IPC send, becoming `Empty` temporarily.
+// See `AssociateHandleForMessage::take_handle`.
+// - Windows: DuplicateHandle transfers the handle to the remote process.
+// This produces a new handle value in the local process representing the remote handle.
+// This value must be sent to the remote, so `AssociateHandleForMessage::set_remote_handle`
+// is used to transform the handle into a `SerializableValue`.
+// - Unix: sendmsg transfers the handle to the remote process. The handle is left `Empty`.
+// (Note: this occurs later, when the serialized message buffer is sent)
+// 3. Message containing `SerializableValue` or `Empty` (depending on handle processing in step 2)
+// is serialized and sent via IPC.
+// 4. Message received and deserialized in target process.
+// - Windows: `AssociateHandleForMessage::set_local_handle converts the received `SerializableValue` into `Owned`, ready for use.
+// - Unix: Handle (with a new value in the target process) is received out-of-band via `recvmsg`
+// and converted to `Owned` via `AssociateHandleForMessage::set_local_handle`.
+#[derive(Debug)]
+pub enum SerializableHandle {
+ // Owned handle, with optional target_pid on sending side.
+ Owned(PlatformHandle, Option<u32>),
+ // Transitional IPC states:
+ SerializableValue(PlatformHandleType), // Windows
+ Empty, // Unix
+}
+
+// PlatformHandle is non-Send and contains a pointer (HANDLE) on Windows.
+#[allow(clippy::non_send_fields_in_send_ty)]
+unsafe impl Send for SerializableHandle {}
+
+impl SerializableHandle {
+ pub fn new(handle: PlatformHandle, target_pid: u32) -> SerializableHandle {
+ SerializableHandle::Owned(handle, Some(target_pid))
+ }
+
+ // Called on the receiving side to take ownership of the handle.
+ pub fn take_handle(&mut self) -> PlatformHandle {
+ match std::mem::replace(self, SerializableHandle::Empty) {
+ SerializableHandle::Owned(handle, target_pid) => {
+ assert!(target_pid.is_none());
+ handle
+ }
+ _ => panic!("take_handle called in invalid state"),
+ }
+ }
+
+ // Called on the sending side to take ownership of the handle for
+ // handling platform-specific remoting.
+ fn take_handle_for_send(&mut self) -> RemoteHandle {
+ match std::mem::replace(self, SerializableHandle::Empty) {
+ SerializableHandle::Owned(handle, target_pid) => unsafe {
+ RemoteHandle::new(
+ handle.into_raw(),
+ target_pid.expect("target process required"),
+ )
+ },
+ _ => panic!("take_handle_for_send called in invalid state"),
+ }
+ }
+
+ fn new_owned(handle: PlatformHandleType) -> SerializableHandle {
+ SerializableHandle::Owned(PlatformHandle::new(handle), None)
+ }
+
+ #[cfg(windows)]
+ fn make_owned(&mut self) {
+ if let SerializableHandle::SerializableValue(handle) = self {
+ *self = SerializableHandle::new_owned(*handle);
+ } else {
+ panic!("make_owned called in invalid state")
+ }
+ }
+
+ fn new_serializable_value(handle: PlatformHandleType) -> SerializableHandle {
+ SerializableHandle::SerializableValue(handle)
+ }
+
+ fn get_serializable_value(&self) -> PlatformHandleType {
+ match *self {
+ SerializableHandle::SerializableValue(handle) => handle,
+ SerializableHandle::Empty => INVALID_HANDLE_VALUE,
+ _ => panic!("get_remote_handle called in invalid state"),
+ }
+ }
+}
+
+// Raw handle values are serialized as i64. Additional handling external to (de)serialization is required during IPC
+// send/receive to convert these raw values into valid handles.
+impl serde::Serialize for SerializableHandle {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let handle = self.get_serializable_value();
+ serializer.serialize_i64(handle as i64)
+ }
+}
+
+impl<'de> serde::Deserialize<'de> for SerializableHandle {
+ fn deserialize<D>(deserializer: D) -> Result<SerializableHandle, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ deserializer.deserialize_i64(SerializableHandleVisitor)
+ }
+}
+
+struct SerializableHandleVisitor;
+impl serde::de::Visitor<'_> for SerializableHandleVisitor {
+ type Value = SerializableHandle;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ formatter.write_str("an integer between -2^63 and 2^63")
+ }
+
+ fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ Ok(SerializableHandle::new_serializable_value(
+ value as PlatformHandleType,
+ ))
+ }
+}
+
+// Represents a PlatformHandle in-flight between processes.
+// On Unix platforms, this is just a plain owned Handle, closed on drop.
+// On Windows, `RemoteHandle` also retains ownership of the `target_handle`
+// in the `target` process. Once the handle has been successfully sent
+// to the remote, the sender should call `mark_sent()` to relinquish
+// ownership of `target_handle` in the remote.
+#[derive(Debug)]
+pub struct RemoteHandle {
+ pub(crate) handle: PlatformHandleType,
+ #[cfg(windows)]
+ pub(crate) target: u32,
+ #[cfg(windows)]
+ pub(crate) target_handle: Option<PlatformHandleType>,
+}
+
+impl RemoteHandle {
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn new(handle: PlatformHandleType, _target: u32) -> Self {
+ RemoteHandle {
+ handle,
+ #[cfg(windows)]
+ target: _target,
+ #[cfg(windows)]
+ target_handle: None,
+ }
+ }
+
+ #[cfg(windows)]
+ pub fn mark_sent(&mut self) {
+ self.target_handle.take();
+ }
+
+ #[cfg(windows)]
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn send_to_target(&mut self) -> std::io::Result<PlatformHandleType> {
+ let target_handle = crate::duplicate_platform_handle(self.handle, Some(self.target))?;
+ self.target_handle = Some(target_handle);
+ Ok(target_handle)
+ }
+
+ #[cfg(unix)]
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn take(self) -> PlatformHandleType {
+ let h = self.handle;
+ std::mem::forget(self);
+ h
+ }
+}
+
+impl Drop for RemoteHandle {
+ fn drop(&mut self) {
+ unsafe {
+ crate::close_platform_handle(self.handle);
+ }
+ #[cfg(windows)]
+ unsafe {
+ if let Some(target_handle) = self.target_handle {
+ if let Err(e) = crate::close_target_handle(target_handle, self.target) {
+ trace!("RemoteHandle failed to close target handle: {:?}", e);
+ }
+ }
+ }
+ }
+}
+
+unsafe impl Send for RemoteHandle {}
+
+pub trait AssociateHandleForMessage {
+ // True if this item has an associated handle attached for remoting.
+ fn has_associated_handle(&self) -> bool {
+ false
+ }
+
+ // Take ownership of the associated handle, leaving the item's
+ // associated handle empty.
+ fn take_handle(&mut self) -> RemoteHandle {
+ panic!("take_handle called on item without associated handle");
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ // Replace an empty associated handle with a non-owning serializable value
+ // indicating the value of the handle in the remote process.
+ #[cfg(windows)]
+ unsafe fn set_remote_handle(&mut self, _: PlatformHandleType) {
+ panic!("set_remote_handle called on item without associated handle");
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ // Replace a serialized associated handle value with an owned local handle.
+ #[cfg(windows)]
+ unsafe fn set_local_handle(&mut self) {
+ panic!("set_local_handle called on item without associated handle");
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ // Replace an empty associated handle with an owned local handle.
+ #[cfg(unix)]
+ unsafe fn set_local_handle(&mut self, _: PlatformHandleType) {
+ panic!("set_local_handle called on item without associated handle");
+ }
+}
+
+impl AssociateHandleForMessage for ClientMessage {
+ fn has_associated_handle(&self) -> bool {
+ matches!(
+ *self,
+ ClientMessage::StreamCreated(_)
+ | ClientMessage::StreamInitialized(_)
+ | ClientMessage::ContextSetupDeviceCollectionCallback(_)
+ )
+ }
+
+ fn take_handle(&mut self) -> RemoteHandle {
+ match *self {
+ ClientMessage::StreamCreated(ref mut data) => data.shm_handle.take_handle_for_send(),
+ ClientMessage::StreamInitialized(ref mut data) => data.take_handle_for_send(),
+ ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
+ data.platform_handle.take_handle_for_send()
+ }
+ _ => panic!("take_handle called on item without associated handle"),
+ }
+ }
+
+ #[cfg(windows)]
+ unsafe fn set_remote_handle(&mut self, handle: PlatformHandleType) {
+ match *self {
+ ClientMessage::StreamCreated(ref mut data) => {
+ data.shm_handle = SerializableHandle::new_serializable_value(handle);
+ }
+ ClientMessage::StreamInitialized(ref mut data) => {
+ *data = SerializableHandle::new_serializable_value(handle);
+ }
+ ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
+ data.platform_handle = SerializableHandle::new_serializable_value(handle);
+ }
+ _ => panic!("set_remote_handle called on item without associated handle"),
+ }
+ }
+
+ #[cfg(windows)]
+ unsafe fn set_local_handle(&mut self) {
+ match *self {
+ ClientMessage::StreamCreated(ref mut data) => data.shm_handle.make_owned(),
+ ClientMessage::StreamInitialized(ref mut data) => data.make_owned(),
+ ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
+ data.platform_handle.make_owned()
+ }
+ _ => panic!("set_local_handle called on item without associated handle"),
+ }
+ }
+
+ #[cfg(unix)]
+ unsafe fn set_local_handle(&mut self, handle: PlatformHandleType) {
+ match *self {
+ ClientMessage::StreamCreated(ref mut data) => {
+ data.shm_handle = SerializableHandle::new_owned(handle);
+ }
+ ClientMessage::StreamInitialized(ref mut data) => {
+ *data = SerializableHandle::new_owned(handle);
+ }
+ ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => {
+ data.platform_handle = SerializableHandle::new_owned(handle);
+ }
+ _ => panic!("set_local_handle called on item without associated handle"),
+ }
+ }
+}
+
+impl AssociateHandleForMessage for ServerMessage {}
+
+impl AssociateHandleForMessage for DeviceCollectionReq {}
+impl AssociateHandleForMessage for DeviceCollectionResp {}
+
+impl AssociateHandleForMessage for CallbackReq {}
+impl AssociateHandleForMessage for CallbackResp {}
+
+#[cfg(test)]
+mod test {
+ use super::StreamParams;
+ use cubeb::ffi;
+ use std::mem;
+
+ #[test]
+ fn stream_params_size_check() {
+ assert_eq!(
+ mem::size_of::<StreamParams>(),
+ mem::size_of::<ffi::cubeb_stream_params>()
+ )
+ }
+
+ #[test]
+ fn stream_params_from() {
+ let raw = ffi::cubeb_stream_params {
+ format: ffi::CUBEB_SAMPLE_FLOAT32BE,
+ rate: 96_000,
+ channels: 32,
+ layout: ffi::CUBEB_LAYOUT_3F1_LFE,
+ prefs: ffi::CUBEB_STREAM_PREF_LOOPBACK,
+ };
+ let wrapped = ::cubeb::StreamParams::from(raw);
+ let params = StreamParams::from(wrapped.as_ref());
+ assert_eq!(params.format, raw.format);
+ assert_eq!(params.rate, raw.rate);
+ assert_eq!(params.channels, raw.channels);
+ assert_eq!(params.layout, raw.layout);
+ assert_eq!(params.prefs, raw.prefs);
+ }
+}
diff --git a/third_party/rust/audioipc2/src/rpccore.rs b/third_party/rust/audioipc2/src/rpccore.rs
new file mode 100644
index 0000000000..69ed2e8cf0
--- /dev/null
+++ b/third_party/rust/audioipc2/src/rpccore.rs
@@ -0,0 +1,470 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crossbeam_queue::ArrayQueue;
+use mio::Token;
+use std::cell::UnsafeCell;
+use std::collections::VecDeque;
+use std::io::{self, Error, ErrorKind, Result};
+use std::marker::PhantomPinned;
+use std::mem::ManuallyDrop;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Weak};
+
+use crate::ipccore::EventLoopHandle;
+
+// This provides a safe-ish method for a thread to allocate
+// stack storage space for a result, then pass a (wrapped)
+// pointer to that location to another thread via
+// a CompletionWriter to eventually store a result into.
+struct Completion<T> {
+ item: UnsafeCell<Option<T>>,
+ writer: AtomicBool,
+ _pin: PhantomPinned, // disable rustc's no-alias
+}
+
+impl<T> Completion<T> {
+ fn new() -> Self {
+ Completion {
+ item: UnsafeCell::new(None),
+ writer: AtomicBool::new(false),
+ _pin: PhantomPinned,
+ }
+ }
+
+ // Wait until the writer completes, then return the result.
+ // This is intended to be a single-use function, once the writer
+ // has completed any further attempts to wait will return None.
+ fn wait(&self) -> Option<T> {
+ // Wait for the writer to complete or be dropped.
+ while self.writer.load(Ordering::Acquire) {
+ std::thread::park();
+ }
+ unsafe { (*self.item.get()).take() }
+ }
+
+ // Create a writer for the other thread to store the
+ // expected result into.
+ fn writer(&self) -> CompletionWriter<T> {
+ assert!(!self.writer.load(Ordering::Relaxed));
+ self.writer.store(true, Ordering::Release);
+ CompletionWriter {
+ ptr: self as *const _ as *mut _,
+ waiter: std::thread::current(),
+ }
+ }
+}
+
+impl<T> Drop for Completion<T> {
+ fn drop(&mut self) {
+ // Wait for the outstanding writer to complete before
+ // dropping, since the CompletionWriter references
+ // memory owned by this object.
+ while self.writer.load(Ordering::Acquire) {
+ std::thread::park();
+ }
+ }
+}
+
+struct CompletionWriter<T> {
+ ptr: *mut Completion<T>, // Points to a Completion on another thread's stack
+ waiter: std::thread::Thread, // Identifies thread waiting for completion
+}
+
+impl<T> CompletionWriter<T> {
+ fn set(self, value: T) {
+ // Store the result into the Completion's memory.
+ // Since `set` consumes `self`, rely on `Drop` to
+ // mark the writer as done and wake the Completion's
+ // thread.
+ unsafe {
+ assert!((*self.ptr).writer.load(Ordering::Relaxed));
+ *(*self.ptr).item.get() = Some(value);
+ }
+ }
+}
+
+impl<T> Drop for CompletionWriter<T> {
+ fn drop(&mut self) {
+ // Mark writer as complete - if `set` was not called,
+ // the waiter will receive `None`.
+ unsafe {
+ (*self.ptr).writer.store(false, Ordering::Release);
+ }
+ // Wake the Completion's thread.
+ self.waiter.unpark();
+ }
+}
+
+// Safety: CompletionWriter holds a pointer to a Completion
+// residing on another thread's stack. The Completion always
+// waits for an outstanding writer if present, and CompletionWriter
+// releases the waiter and wakes the Completion's thread on drop,
+// so this pointer will always be live for the duration of a
+// CompletionWriter.
+unsafe impl<T> Send for CompletionWriter<T> {}
+
+// RPC message handler. Implemented by ClientHandler (for Client)
+// and ServerHandler (for Server).
+pub(crate) trait Handler {
+ type In;
+ type Out;
+
+ // Consume a request
+ fn consume(&mut self, request: Self::In) -> Result<()>;
+
+ // Produce a response
+ fn produce(&mut self) -> Result<Option<Self::Out>>;
+}
+
+// Client RPC definition. This supplies the expected message
+// request and response types.
+pub trait Client {
+ type ServerMessage;
+ type ClientMessage;
+}
+
+// Server RPC definition. This supplies the expected message
+// request and response types. `process` is passed inbound RPC
+// requests by the ServerHandler to be responded to by the server.
+pub trait Server {
+ type ServerMessage;
+ type ClientMessage;
+
+ fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage;
+}
+
+// RPC Client Proxy implementation.
+type ProxyRequest<Request, Response> = (Request, CompletionWriter<Response>);
+
+// RPC Proxy that may be `clone`d for use by multiple owners/threads.
+// A Proxy `call` arranges for the supplied request to be transmitted
+// to the associated Server via RPC and blocks awaiting the response
+// via the associated `Completion`.
+// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler
+// encounters an internal error, `requests` will fail to upgrade, allowing
+// the proxy to report an error.
+#[derive(Debug)]
+pub struct Proxy<Request, Response> {
+ handle: Option<(EventLoopHandle, Token)>,
+ requests: ManuallyDrop<RequestQueueSender<ProxyRequest<Request, Response>>>,
+}
+
+impl<Request, Response> Proxy<Request, Response> {
+ fn new(requests: RequestQueueSender<ProxyRequest<Request, Response>>) -> Self {
+ Self {
+ handle: None,
+ requests: ManuallyDrop::new(requests),
+ }
+ }
+
+ pub fn call(&self, request: Request) -> Result<Response> {
+ let response = Completion::new();
+ self.requests.push((request, response.writer()))?;
+ self.wake_connection();
+ match response.wait() {
+ Some(resp) => Ok(resp),
+ None => Err(Error::new(ErrorKind::Other, "proxy recv error")),
+ }
+ }
+
+ pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) {
+ self.handle = Some((handle, token));
+ }
+
+ fn wake_connection(&self) {
+ let (handle, token) = self
+ .handle
+ .as_ref()
+ .expect("proxy not connected to event loop");
+ handle.wake_connection(*token);
+ }
+}
+
+impl<Request, Response> Clone for Proxy<Request, Response> {
+ fn clone(&self) -> Self {
+ let mut clone = Self::new((*self.requests).clone());
+ let (handle, token) = self
+ .handle
+ .as_ref()
+ .expect("proxy not connected to event loop");
+ clone.connect_event_loop(handle.clone(), *token);
+ clone
+ }
+}
+
+impl<Request, Response> Drop for Proxy<Request, Response> {
+ fn drop(&mut self) {
+ trace!("Proxy drop, waking EventLoop");
+ // Must drop `requests` before waking the connection, otherwise
+ // the wake may be processed before the (last) weak reference is
+ // dropped.
+ let last_proxy = self.requests.live_proxies();
+ unsafe {
+ ManuallyDrop::drop(&mut self.requests);
+ }
+ if last_proxy == 1 && self.handle.is_some() {
+ self.wake_connection()
+ }
+ }
+}
+
+const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client.
+
+// Client-specific Handler implementation.
+// The IPC EventLoop Driver calls this to execute client-specific
+// RPC handling. Serialized messages sent via a Proxy are queued
+// for transmission when `produce` is called.
+// Deserialized messages are passed via `consume` to
+// trigger response completion by sending the response via a channel
+// connected to a ProxyResponse.
+pub(crate) struct ClientHandler<C: Client> {
+ in_flight: VecDeque<CompletionWriter<C::ClientMessage>>,
+ requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
+}
+
+impl<C: Client> ClientHandler<C> {
+ fn new(
+ requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>,
+ ) -> ClientHandler<C> {
+ ClientHandler::<C> {
+ in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES),
+ requests,
+ }
+ }
+}
+
+impl<C: Client> Handler for ClientHandler<C> {
+ type In = C::ClientMessage;
+ type Out = C::ServerMessage;
+
+ fn consume(&mut self, response: Self::In) -> Result<()> {
+ trace!("ClientHandler::consume");
+ if let Some(response_writer) = self.in_flight.pop_front() {
+ response_writer.set(response);
+ } else {
+ return Err(Error::new(ErrorKind::Other, "request/response mismatch"));
+ }
+
+ Ok(())
+ }
+
+ fn produce(&mut self) -> Result<Option<Self::Out>> {
+ trace!("ClientHandler::produce");
+
+ // If the weak count is zero, no proxies are attached and
+ // no further proxies can be attached since every proxy
+ // after the initial one is cloned from an existing instance.
+ self.requests.check_live_proxies()?;
+ // Try to get a new message
+ match self.requests.pop() {
+ Some((request, response_writer)) => {
+ trace!(" --> received request");
+ self.in_flight.push_back(response_writer);
+ Ok(Some(request))
+ }
+ None => {
+ trace!(" --> no request");
+ Ok(None)
+ }
+ }
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct RequestQueue<T> {
+ queue: ArrayQueue<T>,
+}
+
+impl<T> RequestQueue<T> {
+ pub(crate) fn new(size: usize) -> Self {
+ RequestQueue {
+ queue: ArrayQueue::new(size),
+ }
+ }
+
+ pub(crate) fn pop(&self) -> Option<T> {
+ self.queue.pop()
+ }
+
+ pub(crate) fn new_sender(self: &Arc<Self>) -> RequestQueueSender<T> {
+ RequestQueueSender {
+ inner: Arc::downgrade(self),
+ }
+ }
+
+ pub(crate) fn check_live_proxies(self: &Arc<Self>) -> Result<()> {
+ if Arc::weak_count(self) == 0 {
+ return Err(io::ErrorKind::ConnectionAborted.into());
+ }
+ Ok(())
+ }
+}
+
+pub(crate) struct RequestQueueSender<T> {
+ inner: Weak<RequestQueue<T>>,
+}
+
+impl<T> RequestQueueSender<T> {
+ pub(crate) fn push(&self, request: T) -> Result<()> {
+ if let Some(consumer) = self.inner.upgrade() {
+ if consumer.queue.push(request).is_err() {
+ debug!("Proxy[{:p}]: call failed - CH::requests full", self);
+ return Err(io::ErrorKind::ConnectionAborted.into());
+ }
+ return Ok(());
+ }
+ debug!("Proxy[{:p}]: call failed - CH::requests dropped", self);
+ Err(Error::new(ErrorKind::Other, "proxy send error"))
+ }
+
+ pub(crate) fn live_proxies(&self) -> usize {
+ Weak::weak_count(&self.inner)
+ }
+}
+
+impl<T> Clone for RequestQueueSender<T> {
+ fn clone(&self) -> Self {
+ Self {
+ inner: self.inner.clone(),
+ }
+ }
+}
+
+impl<T> std::fmt::Debug for RequestQueueSender<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RequestQueueProducer")
+ .field("inner", &self.inner.as_ptr())
+ .finish()
+ }
+}
+
+#[allow(clippy::type_complexity)]
+pub(crate) fn make_client<C: Client>(
+) -> Result<(ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>)> {
+ let requests = Arc::new(RequestQueue::new(RPC_CLIENT_INITIAL_PROXIES));
+ let proxy_req = requests.new_sender();
+ let handler = ClientHandler::new(requests);
+
+ Ok((handler, Proxy::new(proxy_req)))
+}
+
+// Server-specific Handler implementation.
+// The IPC EventLoop Driver calls this to execute server-specific
+// RPC handling. Deserialized messages are passed via `consume` to the
+// associated `server` for processing. Server responses are then queued
+// for RPC to the associated client when `produce` is called.
+pub(crate) struct ServerHandler<S: Server> {
+ server: S,
+ in_flight: VecDeque<S::ClientMessage>,
+}
+
+impl<S: Server> Handler for ServerHandler<S> {
+ type In = S::ServerMessage;
+ type Out = S::ClientMessage;
+
+ fn consume(&mut self, message: Self::In) -> Result<()> {
+ trace!("ServerHandler::consume");
+ let response = self.server.process(message);
+ self.in_flight.push_back(response);
+ Ok(())
+ }
+
+ fn produce(&mut self) -> Result<Option<Self::Out>> {
+ trace!("ServerHandler::produce");
+
+ // Return the ready response
+ match self.in_flight.pop_front() {
+ Some(res) => {
+ trace!(" --> received response");
+ Ok(Some(res))
+ }
+ None => {
+ trace!(" --> no response ready");
+ Ok(None)
+ }
+ }
+ }
+}
+
+const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server.
+
+pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> {
+ ServerHandler::<S> {
+ server,
+ in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS),
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn basic() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ assert!(queue.pop().is_none());
+ producer.push(1).unwrap();
+ assert!(queue.pop().is_some());
+ assert!(queue.pop().is_none());
+ }
+
+ #[test]
+ fn queue_dropped() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ drop(queue);
+ assert!(producer.push(1).is_err());
+ }
+
+ #[test]
+ fn queue_full() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ producer.push(1).unwrap();
+ assert!(producer.push(2).is_err());
+ }
+
+ #[test]
+ fn queue_producer_clone() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ let producer2 = producer.clone();
+ producer.push(1).unwrap();
+ assert!(producer2.push(2).is_err());
+ }
+
+ #[test]
+ fn queue_producer_drop() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ let producer2 = producer.clone();
+ drop(producer);
+ assert!(producer2.push(2).is_ok());
+ }
+
+ #[test]
+ fn queue_producer_weak() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ let producer2 = producer.clone();
+ drop(queue);
+ assert!(producer2.push(2).is_err());
+ }
+
+ #[test]
+ fn queue_producer_shutdown() {
+ let queue = Arc::new(RequestQueue::new(1));
+ let producer = queue.new_sender();
+ let producer2 = producer.clone();
+ producer.push(1).unwrap();
+ assert!(Arc::weak_count(&queue) == 2);
+ drop(producer);
+ assert!(Arc::weak_count(&queue) == 1);
+ drop(producer2);
+ assert!(Arc::weak_count(&queue) == 0);
+ }
+}
diff --git a/third_party/rust/audioipc2/src/shm.rs b/third_party/rust/audioipc2/src/shm.rs
new file mode 100644
index 0000000000..2aaa92ef36
--- /dev/null
+++ b/third_party/rust/audioipc2/src/shm.rs
@@ -0,0 +1,334 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+#![allow(clippy::missing_safety_doc)]
+
+use crate::errors::*;
+use crate::PlatformHandle;
+use std::{convert::TryInto, ffi::c_void, slice};
+
+#[cfg(unix)]
+pub use unix::SharedMem;
+#[cfg(windows)]
+pub use windows::SharedMem;
+
+#[derive(Copy, Clone)]
+struct SharedMemView {
+ ptr: *mut c_void,
+ size: usize,
+}
+
+unsafe impl Send for SharedMemView {}
+
+impl SharedMemView {
+ pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> {
+ let map = slice::from_raw_parts(self.ptr as _, self.size);
+ if size <= self.size {
+ Ok(&map[..size])
+ } else {
+ bail!("mmap size");
+ }
+ }
+
+ pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
+ let map = slice::from_raw_parts_mut(self.ptr as _, self.size);
+ if size <= self.size {
+ Ok(&mut map[..size])
+ } else {
+ bail!("mmap size")
+ }
+ }
+}
+
+#[cfg(unix)]
+mod unix {
+ use super::*;
+ use memmap2::{MmapMut, MmapOptions};
+ use std::fs::File;
+ use std::os::unix::io::{AsRawFd, FromRawFd};
+
+ #[cfg(target_os = "android")]
+ fn open_shm_file(_id: &str, size: usize) -> Result<File> {
+ unsafe {
+ let fd = ashmem::ASharedMemory_create(std::ptr::null(), size);
+ if fd >= 0 {
+ // Drop PROT_EXEC
+ let r = ashmem::ASharedMemory_setProt(fd, libc::PROT_READ | libc::PROT_WRITE);
+ assert_eq!(r, 0);
+ return Ok(File::from_raw_fd(fd.try_into().unwrap()));
+ }
+ Err(std::io::Error::last_os_error().into())
+ }
+ }
+
+ #[cfg(not(target_os = "android"))]
+ fn open_shm_file(id: &str, size: usize) -> Result<File> {
+ let file = open_shm_file_impl(id)?;
+ allocate_file(&file, size)?;
+ Ok(file)
+ }
+
+ #[cfg(not(target_os = "android"))]
+ fn open_shm_file_impl(id: &str) -> Result<File> {
+ use std::env::temp_dir;
+ use std::fs::{remove_file, OpenOptions};
+
+ let id_cstring = std::ffi::CString::new(id).unwrap();
+
+ #[cfg(target_os = "linux")]
+ {
+ unsafe {
+ let r = libc::syscall(libc::SYS_memfd_create, id_cstring.as_ptr(), 0);
+ if r >= 0 {
+ return Ok(File::from_raw_fd(r.try_into().unwrap()));
+ }
+ }
+
+ let mut path = std::path::PathBuf::from("/dev/shm");
+ path.push(id);
+
+ if let Ok(file) = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ {
+ let _ = remove_file(&path);
+ return Ok(file);
+ }
+ }
+
+ unsafe {
+ let fd = libc::shm_open(
+ id_cstring.as_ptr(),
+ libc::O_RDWR | libc::O_CREAT | libc::O_EXCL,
+ 0o600,
+ );
+ if fd >= 0 {
+ libc::shm_unlink(id_cstring.as_ptr());
+ return Ok(File::from_raw_fd(fd));
+ }
+ }
+
+ let mut path = temp_dir();
+ path.push(id);
+
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)?;
+
+ let _ = remove_file(&path);
+ Ok(file)
+ }
+
+ #[cfg(not(target_os = "android"))]
+ fn handle_enospc(s: &str) -> Result<()> {
+ let err = std::io::Error::last_os_error();
+ let errno = err.raw_os_error().unwrap_or(0);
+ assert_ne!(errno, 0);
+ debug!("allocate_file: {} failed errno={}", s, errno);
+ if errno == libc::ENOSPC {
+ return Err(err.into());
+ }
+ Ok(())
+ }
+
+ #[cfg(not(target_os = "android"))]
+ fn allocate_file(file: &File, size: usize) -> Result<()> {
+ // First, set the file size. This may create a sparse file on
+ // many systems, which can fail with SIGBUS when accessed via a
+ // mapping and the lazy backing allocation fails due to low disk
+ // space. To avoid this, try to force the entire file to be
+ // preallocated before mapping using OS-specific approaches below.
+
+ file.set_len(size.try_into().unwrap())?;
+
+ let fd = file.as_raw_fd();
+ let size: libc::off_t = size.try_into().unwrap();
+
+ // Try Linux-specific fallocate.
+ #[cfg(target_os = "linux")]
+ {
+ if unsafe { libc::fallocate(fd, 0, 0, size) } == 0 {
+ return Ok(());
+ }
+ handle_enospc("fallocate()")?;
+ }
+
+ // Try macOS-specific fcntl.
+ #[cfg(target_os = "macos")]
+ {
+ let params = libc::fstore_t {
+ fst_flags: libc::F_ALLOCATEALL,
+ fst_posmode: libc::F_PEOFPOSMODE,
+ fst_offset: 0,
+ fst_length: size,
+ fst_bytesalloc: 0,
+ };
+ if unsafe { libc::fcntl(fd, libc::F_PREALLOCATE, &params) } == 0 {
+ return Ok(());
+ }
+ handle_enospc("fcntl(F_PREALLOCATE)")?;
+ }
+
+ // Fall back to portable version, where available.
+ #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "dragonfly"))]
+ {
+ if unsafe { libc::posix_fallocate(fd, 0, size) } == 0 {
+ return Ok(());
+ }
+ handle_enospc("posix_fallocate()")?;
+ }
+
+ Ok(())
+ }
+
+ pub struct SharedMem {
+ file: File,
+ _mmap: MmapMut,
+ view: SharedMemView,
+ }
+
+ impl SharedMem {
+ pub fn new(id: &str, size: usize) -> Result<SharedMem> {
+ let file = open_shm_file(id, size)?;
+ let mut mmap = unsafe { MmapOptions::new().len(size).map_mut(&file)? };
+ assert_eq!(mmap.len(), size);
+ let view = SharedMemView {
+ ptr: mmap.as_mut_ptr() as _,
+ size,
+ };
+ Ok(SharedMem {
+ file,
+ _mmap: mmap,
+ view,
+ })
+ }
+
+ pub unsafe fn make_handle(&self) -> Result<PlatformHandle> {
+ PlatformHandle::duplicate(self.file.as_raw_fd()).map_err(|e| e.into())
+ }
+
+ pub unsafe fn from(handle: PlatformHandle, size: usize) -> Result<SharedMem> {
+ let file = File::from_raw_fd(handle.into_raw());
+ let mut mmap = MmapOptions::new().len(size).map_mut(&file)?;
+ assert_eq!(mmap.len(), size);
+ let view = SharedMemView {
+ ptr: mmap.as_mut_ptr() as _,
+ size,
+ };
+ Ok(SharedMem {
+ file,
+ _mmap: mmap,
+ view,
+ })
+ }
+
+ pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> {
+ self.view.get_slice(size)
+ }
+
+ pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
+ self.view.get_mut_slice(size)
+ }
+
+ pub fn get_size(&self) -> usize {
+ self.view.size
+ }
+ }
+}
+
+#[cfg(windows)]
+mod windows {
+ use super::*;
+ use std::ptr;
+ use winapi::{
+ shared::{minwindef::DWORD, ntdef::HANDLE},
+ um::{
+ handleapi::CloseHandle,
+ memoryapi::{MapViewOfFile, UnmapViewOfFile, FILE_MAP_ALL_ACCESS},
+ winbase::CreateFileMappingA,
+ winnt::PAGE_READWRITE,
+ },
+ };
+
+ use crate::INVALID_HANDLE_VALUE;
+
+ pub struct SharedMem {
+ handle: HANDLE,
+ view: SharedMemView,
+ }
+
+ unsafe impl Send for SharedMem {}
+
+ impl Drop for SharedMem {
+ fn drop(&mut self) {
+ unsafe {
+ let ok = UnmapViewOfFile(self.view.ptr);
+ assert_ne!(ok, 0);
+ let ok = CloseHandle(self.handle);
+ assert_ne!(ok, 0);
+ }
+ }
+ }
+
+ impl SharedMem {
+ pub fn new(_id: &str, size: usize) -> Result<SharedMem> {
+ unsafe {
+ let handle = CreateFileMappingA(
+ INVALID_HANDLE_VALUE,
+ ptr::null_mut(),
+ PAGE_READWRITE,
+ (size as u64 >> 32).try_into().unwrap(),
+ (size as u64 & (DWORD::MAX as u64)).try_into().unwrap(),
+ ptr::null(),
+ );
+ if handle.is_null() {
+ return Err(std::io::Error::last_os_error().into());
+ }
+
+ let ptr = MapViewOfFile(handle, FILE_MAP_ALL_ACCESS, 0, 0, size);
+ if ptr.is_null() {
+ return Err(std::io::Error::last_os_error().into());
+ }
+
+ Ok(SharedMem {
+ handle,
+ view: SharedMemView { ptr, size },
+ })
+ }
+ }
+
+ pub unsafe fn make_handle(&self) -> Result<PlatformHandle> {
+ PlatformHandle::duplicate(self.handle).map_err(|e| e.into())
+ }
+
+ pub unsafe fn from(handle: PlatformHandle, size: usize) -> Result<SharedMem> {
+ let handle = handle.into_raw();
+ let ptr = MapViewOfFile(handle, FILE_MAP_ALL_ACCESS, 0, 0, size);
+ if ptr.is_null() {
+ return Err(std::io::Error::last_os_error().into());
+ }
+ Ok(SharedMem {
+ handle,
+ view: SharedMemView { ptr, size },
+ })
+ }
+
+ pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> {
+ self.view.get_slice(size)
+ }
+
+ pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> {
+ self.view.get_mut_slice(size)
+ }
+
+ pub fn get_size(&self) -> usize {
+ self.view.size
+ }
+ }
+}
diff --git a/third_party/rust/audioipc2/src/sys/mod.rs b/third_party/rust/audioipc2/src/sys/mod.rs
new file mode 100644
index 0000000000..0bcfdaa15e
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/mod.rs
@@ -0,0 +1,77 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::{collections::VecDeque, io::Result};
+
+use bytes::BytesMut;
+use mio::{event::Source, Interest, Registry, Token};
+
+#[cfg(unix)]
+mod unix;
+use crate::messages::RemoteHandle;
+
+#[cfg(unix)]
+pub use self::unix::*;
+
+#[cfg(windows)]
+mod windows;
+#[cfg(windows)]
+pub use self::windows::*;
+
+impl Source for Pipe {
+ fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+ self.io.register(registry, token, interests)
+ }
+
+ fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+ self.io.reregister(registry, token, interests)
+ }
+
+ fn deregister(&mut self, registry: &Registry) -> Result<()> {
+ self.io.deregister(registry)
+ }
+}
+
+const HANDLE_QUEUE_LIMIT: usize = 16;
+
+#[derive(Debug)]
+pub struct ConnectionBuffer {
+ pub buf: BytesMut,
+ handles: VecDeque<RemoteHandle>,
+}
+
+impl ConnectionBuffer {
+ pub fn with_capacity(cap: usize) -> Self {
+ ConnectionBuffer {
+ buf: BytesMut::with_capacity(cap),
+ handles: VecDeque::with_capacity(HANDLE_QUEUE_LIMIT),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.buf.is_empty()
+ }
+
+ pub fn push_handle(&mut self, handle: RemoteHandle) {
+ assert!(self.handles.len() < self.handles.capacity());
+ self.handles.push_back(handle)
+ }
+
+ pub fn pop_handle(&mut self) -> Option<RemoteHandle> {
+ self.handles.pop_front()
+ }
+}
+
+pub trait RecvMsg {
+ // Receive data from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members have been adjusted appropriately by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>;
+}
+
+pub trait SendMsg {
+ // Send data on the associated connection. `send_msg` consumes and adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>;
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs
new file mode 100644
index 0000000000..581df9a847
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs
@@ -0,0 +1,104 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::sys::HANDLE_QUEUE_LIMIT;
+use bytes::{BufMut, BytesMut};
+use libc::{self, cmsghdr};
+use std::convert::TryInto;
+use std::os::unix::io::RawFd;
+use std::{mem, slice};
+
+trait AsBytes {
+ fn as_bytes(&self) -> &[u8];
+}
+
+impl<'a, T: Sized> AsBytes for &'a [T] {
+ fn as_bytes(&self) -> &[u8] {
+ // TODO: This should account for the alignment of T
+ let byte_count = self.len() * mem::size_of::<T>();
+ unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) }
+ }
+}
+
+// Encode `handles` into a cmsghdr in `buf`.
+pub fn encode_handles(cmsg: &mut BytesMut, handles: &[RawFd]) {
+ assert!(handles.len() <= HANDLE_QUEUE_LIMIT);
+ let msg = handles.as_bytes();
+
+ let cmsg_space = space(msg.len());
+ assert!(cmsg.remaining_mut() >= cmsg_space);
+
+ // Some definitions of cmsghdr contain padding. Rather
+ // than try to keep an up-to-date #cfg list to handle
+ // that, just use a pre-zeroed struct to fill out any
+ // fields we don't care about.
+ let zeroed = unsafe { mem::zeroed() };
+ #[allow(clippy::needless_update)]
+ // `cmsg_len` is `usize` on some platforms, `u32` on others.
+ #[allow(clippy::useless_conversion)]
+ let cmsghdr = cmsghdr {
+ cmsg_len: len(msg.len()).try_into().unwrap(),
+ cmsg_level: libc::SOL_SOCKET,
+ cmsg_type: libc::SCM_RIGHTS,
+ ..zeroed
+ };
+
+ unsafe {
+ let cmsghdr_ptr = cmsg.chunk_mut().as_mut_ptr();
+ std::ptr::copy_nonoverlapping(
+ &cmsghdr as *const _ as *const _,
+ cmsghdr_ptr,
+ mem::size_of::<cmsghdr>(),
+ );
+ let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
+ std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
+ cmsg.advance_mut(cmsg_space);
+ }
+}
+
+// Decode `buf` containing a cmsghdr with one or more handle(s).
+pub fn decode_handles(buf: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, HANDLE_QUEUE_LIMIT> {
+ let mut fds = arrayvec::ArrayVec::<RawFd, HANDLE_QUEUE_LIMIT>::new();
+
+ let cmsghdr_len = len(0);
+
+ if buf.len() < cmsghdr_len {
+ // No more entries---not enough data in `buf` for a
+ // complete message.
+ return fds;
+ }
+
+ let cmsg: &cmsghdr = unsafe { &*(buf.as_ptr() as *const _) };
+ #[allow(clippy::unnecessary_cast)] // `cmsg_len` type is platform-dependent.
+ let cmsg_len = cmsg.cmsg_len as usize;
+
+ match (cmsg.cmsg_level, cmsg.cmsg_type) {
+ (libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
+ trace!("Found SCM_RIGHTS...");
+ let slice = &buf[cmsghdr_len..cmsg_len];
+ let slice = unsafe {
+ slice::from_raw_parts(
+ slice.as_ptr() as *const _,
+ slice.len() / mem::size_of::<i32>(),
+ )
+ };
+ fds.try_extend_from_slice(slice).unwrap();
+ }
+ (level, kind) => {
+ trace!("Skipping cmsg level, {}, type={}...", level, kind);
+ }
+ }
+
+ assert!(fds.len() <= HANDLE_QUEUE_LIMIT);
+ fds
+}
+
+fn len(len: usize) -> usize {
+ unsafe { libc::CMSG_LEN(len.try_into().unwrap()) as usize }
+}
+
+pub fn space(len: usize) -> usize {
+ unsafe { libc::CMSG_SPACE(len.try_into().unwrap()) as usize }
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c
new file mode 100644
index 0000000000..82d7852867
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c
@@ -0,0 +1,23 @@
+#include <sys/socket.h>
+#include <inttypes.h>
+#include <string.h>
+
+const uint8_t*
+cmsghdr_bytes(size_t* size)
+{
+ int myfd = 0;
+
+ static union {
+ uint8_t buf[CMSG_SPACE(sizeof(myfd))];
+ struct cmsghdr align;
+ } u;
+
+ u.align.cmsg_len = CMSG_LEN(sizeof(myfd));
+ u.align.cmsg_level = SOL_SOCKET;
+ u.align.cmsg_type = SCM_RIGHTS;
+
+ memcpy(CMSG_DATA(&u.align), &myfd, sizeof(myfd));
+
+ *size = sizeof(u);
+ return (const uint8_t*)&u.buf;
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/mod.rs b/third_party/rust/audioipc2/src/sys/unix/mod.rs
new file mode 100644
index 0000000000..84f3f1edf2
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/mod.rs
@@ -0,0 +1,126 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::io::Result;
+use std::os::unix::prelude::{AsRawFd, FromRawFd};
+
+use bytes::{Buf, BufMut, BytesMut};
+use iovec::IoVec;
+use mio::net::UnixStream;
+
+use crate::PlatformHandle;
+
+use super::{ConnectionBuffer, RecvMsg, SendMsg, HANDLE_QUEUE_LIMIT};
+
+pub mod cmsg;
+mod msg;
+
+pub struct Pipe {
+ pub(crate) io: UnixStream,
+ cmsg: BytesMut,
+}
+
+impl Pipe {
+ fn new(io: UnixStream) -> Self {
+ Pipe {
+ io,
+ cmsg: BytesMut::with_capacity(cmsg::space(
+ std::mem::size_of::<i32>() * HANDLE_QUEUE_LIMIT,
+ )),
+ }
+ }
+}
+
+// Create a connected "pipe" pair. The `Pipe` is the server end,
+// the `PlatformHandle` is the client end to be remoted.
+pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
+ let (server, client) = UnixStream::pair()?;
+ Ok((Pipe::new(server), PlatformHandle::from(client)))
+}
+
+impl Pipe {
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
+ Pipe::new(UnixStream::from_raw_fd(handle.into_raw()))
+ }
+
+ pub fn shutdown(&mut self) -> Result<()> {
+ self.io.shutdown(std::net::Shutdown::Both)
+ }
+}
+
+impl RecvMsg for Pipe {
+ // Receive data (and fds) from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members has been adjusted appropriate by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(buf.buf.remaining_mut() > 0);
+ // TODO: MSG_CMSG_CLOEXEC not portable.
+ // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
+ #[cfg(target_os = "linux")]
+ let flags = libc::MSG_CMSG_CLOEXEC | libc::MSG_NOSIGNAL;
+ #[cfg(not(target_os = "linux"))]
+ let flags = 0;
+ let r = unsafe {
+ let chunk = buf.buf.chunk_mut();
+ let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
+ let mut iovec = [<&mut IoVec>::from(slice)];
+ msg::recv_msg_with_flags(
+ self.io.as_raw_fd(),
+ &mut iovec,
+ self.cmsg.chunk_mut(),
+ flags,
+ )
+ };
+ match r {
+ Ok((n, cmsg_n, msg_flags)) => unsafe {
+ trace!("recv_msg_with_flags flags={}", msg_flags);
+ buf.buf.advance_mut(n);
+ self.cmsg.advance_mut(cmsg_n);
+ let handles = cmsg::decode_handles(&mut self.cmsg);
+ self.cmsg.clear();
+ let unused = 0;
+ for h in handles {
+ buf.push_handle(super::RemoteHandle::new(h, unused));
+ }
+ Ok(n)
+ },
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl SendMsg for Pipe {
+ // Send data (and fds) on the associated connection. `send_msg` adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(!buf.buf.is_empty());
+ if !buf.handles.is_empty() {
+ let mut handles = [-1i32; HANDLE_QUEUE_LIMIT];
+ for (i, h) in buf.handles.iter().enumerate() {
+ handles[i] = h.handle;
+ }
+ cmsg::encode_handles(&mut self.cmsg, &handles[..buf.handles.len()]);
+ }
+ let r = {
+ // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
+ #[cfg(target_os = "linux")]
+ let flags = libc::MSG_NOSIGNAL;
+ #[cfg(not(target_os = "linux"))]
+ let flags = 0;
+ let iovec = [<&IoVec>::from(&buf.buf[..buf.buf.len()])];
+ msg::send_msg_with_flags(self.io.as_raw_fd(), &iovec, &self.cmsg, flags)
+ };
+ match r {
+ Ok(n) => {
+ buf.buf.advance(n);
+ // Discard sent handles.
+ while buf.handles.pop_front().is_some() {}
+ self.cmsg.clear();
+ Ok(n)
+ }
+ Err(e) => Err(e),
+ }
+ }
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/msg.rs b/third_party/rust/audioipc2/src/sys/unix/msg.rs
new file mode 100644
index 0000000000..c2cd353289
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/msg.rs
@@ -0,0 +1,82 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+use bytes::buf::UninitSlice;
+use iovec::unix;
+use iovec::IoVec;
+use std::os::unix::io::RawFd;
+use std::{cmp, io, mem, ptr};
+
+fn cvt(r: libc::ssize_t) -> io::Result<usize> {
+ if r == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(r as usize)
+ }
+}
+
+// Convert return of -1 into error message, handling retry on EINTR
+fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
+ loop {
+ match cvt(f()) {
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+ other => return other,
+ }
+ }
+}
+
+pub(crate) fn recv_msg_with_flags(
+ socket: RawFd,
+ bufs: &mut [&mut IoVec],
+ cmsg: &mut UninitSlice,
+ flags: libc::c_int,
+) -> io::Result<(usize, usize, libc::c_int)> {
+ let slice = unix::as_os_slice_mut(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.len() == 0 {
+ (ptr::null_mut(), 0)
+ } else {
+ (cmsg.as_mut_ptr() as _, cmsg.len())
+ };
+
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_mut_ptr();
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
+
+ let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?;
+
+ #[allow(clippy::unnecessary_cast)] // `msg_controllen` type is platform-dependent.
+ let controllen = msghdr.msg_controllen as usize;
+ Ok((n, controllen, msghdr.msg_flags))
+}
+
+pub(crate) fn send_msg_with_flags(
+ socket: RawFd,
+ bufs: &[&IoVec],
+ cmsg: &[u8],
+ flags: libc::c_int,
+) -> io::Result<usize> {
+ let slice = unix::as_os_slice(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.is_empty() {
+ (ptr::null_mut(), 0)
+ } else {
+ (cmsg.as_ptr() as *mut _, cmsg.len())
+ };
+
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_ptr() as *mut _;
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
+
+ cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) })
+}
diff --git a/third_party/rust/audioipc2/src/sys/windows/mod.rs b/third_party/rust/audioipc2/src/sys/windows/mod.rs
new file mode 100644
index 0000000000..973e9608d1
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/windows/mod.rs
@@ -0,0 +1,102 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::{
+ fs::OpenOptions,
+ io::{Read, Write},
+ os::windows::prelude::{FromRawHandle, OpenOptionsExt},
+ sync::atomic::{AtomicUsize, Ordering},
+};
+
+use std::io::Result;
+
+use bytes::{Buf, BufMut};
+use mio::windows::NamedPipe;
+use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
+
+use crate::PlatformHandle;
+
+use super::{ConnectionBuffer, RecvMsg, SendMsg};
+
+pub struct Pipe {
+ pub(crate) io: NamedPipe,
+}
+
+// Create a connected "pipe" pair. The `Pipe` is the server end,
+// the `PlatformHandle` is the client end to be remoted.
+pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
+ let pipe_name = get_pipe_name();
+ let server = NamedPipe::new(&pipe_name)?;
+
+ let client = {
+ let mut opts = OpenOptions::new();
+ opts.read(true)
+ .write(true)
+ .custom_flags(FILE_FLAG_OVERLAPPED);
+ let file = opts.open(&pipe_name)?;
+ PlatformHandle::from(file)
+ };
+
+ Ok((Pipe::new(server), client))
+}
+
+static PIPE_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn get_pipe_name() -> String {
+ let pid = std::process::id();
+ let pipe_id = PIPE_ID.fetch_add(1, Ordering::Relaxed);
+ format!("\\\\.\\pipe\\LOCAL\\cubeb-pipe-{pid}-{pipe_id}")
+}
+
+impl Pipe {
+ pub fn new(io: NamedPipe) -> Self {
+ Self { io }
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
+ Pipe::new(NamedPipe::from_raw_handle(handle.into_raw()))
+ }
+
+ pub fn shutdown(&mut self) -> Result<()> {
+ self.io.disconnect()
+ }
+}
+
+impl RecvMsg for Pipe {
+ // Receive data from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members has been adjusted appropriate by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(buf.buf.remaining_mut() > 0);
+ let r = unsafe {
+ let chunk = buf.buf.chunk_mut();
+ let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
+ self.io.read(slice)
+ };
+ match r {
+ Ok(n) => unsafe {
+ buf.buf.advance_mut(n);
+ Ok(n)
+ },
+ e => e,
+ }
+ }
+}
+
+impl SendMsg for Pipe {
+ // Send data on the associated connection. `send_msg` adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(!buf.buf.is_empty());
+ let r = self.io.write(&buf.buf[..buf.buf.len()]);
+ if let Ok(n) = r {
+ buf.buf.advance(n);
+ while let Some(mut handle) = buf.pop_handle() {
+ handle.mark_sent()
+ }
+ }
+ r
+ }
+}