diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/audioipc2/src | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/audioipc2/src')
-rw-r--r-- | third_party/rust/audioipc2/src/codec.rs | 203 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/errors.rs | 18 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/ipccore.rs | 918 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/lib.rs | 214 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/messages.rs | 632 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/rpccore.rs | 470 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/shm.rs | 334 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/mod.rs | 77 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/cmsg.rs | 104 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/cmsghdr.c | 23 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/mod.rs | 126 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/msg.rs | 82 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/windows/mod.rs | 102 |
13 files changed, 3303 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2/src/codec.rs b/third_party/rust/audioipc2/src/codec.rs new file mode 100644 index 0000000000..2c61faa6ea --- /dev/null +++ b/third_party/rust/audioipc2/src/codec.rs @@ -0,0 +1,203 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +//! `Encoder`s and `Decoder`s from items to/from `BytesMut` buffers. + +// The assert in LengthDelimitedCodec::decode triggers this clippy warning but +// requires upgrading the workspace to Rust 2021 to resolve. +// This should be fixed in Rust 1.68, after which the following `allow` can be deleted. +#![allow(clippy::uninlined_format_args)] + +use bincode::{self, Options}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, BufMut, BytesMut}; +use serde::de::DeserializeOwned; +use serde::ser::Serialize; +use std::convert::TryInto; +use std::fmt::Debug; +use std::io; +use std::marker::PhantomData; +use std::mem::size_of; + +//////////////////////////////////////////////////////////////////////////////// +// Split buffer into size delimited frames - This appears more complicated than +// might be necessary due to handling the possibility of messages being split +// across reads. + +pub trait Codec { + /// The type of items to be encoded into byte buffer + type In; + + /// The type of items to be returned by decoding from byte buffer + type Out; + + /// Attempts to decode a frame from the provided buffer of bytes. + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the I/O. + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> { + match self.decode(buf)? { + Some(frame) => Ok(frame), + None => Err(io::Error::new( + io::ErrorKind::Other, + "bytes remaining on stream", + )), + } + } + + /// Encodes a frame into the buffer provided. + fn encode(&mut self, msg: Self::In, buf: &mut BytesMut) -> io::Result<()>; +} + +/// Codec based upon bincode serialization +/// +/// Messages that have been serialized using bincode are prefixed with +/// the length of the message to aid in deserialization, so that it's +/// known if enough data has been received to decode a complete +/// message. +pub struct LengthDelimitedCodec<In, Out> { + state: State, + encode_buf: Vec<u8>, + __in: PhantomData<In>, + __out: PhantomData<Out>, +} + +enum State { + Length, + Data(u32), +} + +const MAX_MESSAGE_LEN: u32 = 1024 * 1024; +const MAGIC: u64 = 0xa4d1_019c_c910_1d4a; +const HEADER_LEN: usize = size_of::<u32>() + size_of::<u64>(); + +impl<In, Out> Default for LengthDelimitedCodec<In, Out> { + fn default() -> Self { + Self { + state: State::Length, + encode_buf: Vec::with_capacity(crate::ipccore::IPC_CLIENT_BUFFER_SIZE), + __in: PhantomData, + __out: PhantomData, + } + } +} + +impl<In, Out> LengthDelimitedCodec<In, Out> { + // Lengths are encoded as little endian u32 + fn decode_length(buf: &mut BytesMut) -> Option<u32> { + if buf.len() < HEADER_LEN { + // Not enough data + return None; + } + + let magic = LittleEndian::read_u64(&buf[0..8]); + assert_eq!(magic, MAGIC); + + // Consume the length field + let n = LittleEndian::read_u32(&buf[8..12]); + buf.advance(HEADER_LEN); + Some(n) + } + + fn decode_data(buf: &mut BytesMut, n: u32) -> io::Result<Option<Out>> + where + Out: DeserializeOwned + Debug, + { + let n = n.try_into().unwrap(); + + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if buf.len() < n { + return Ok(None); + } + + trace!("Attempting to decode"); + let msg = bincode::options() + .with_limit(MAX_MESSAGE_LEN as u64) + .deserialize::<Out>(&buf[..n]) + .map_err(|e| match *e { + bincode::ErrorKind::Io(e) => e, + _ => io::Error::new(io::ErrorKind::Other, *e), + })?; + buf.advance(n); + + trace!("... Decoded {:?}", msg); + Ok(Some(msg)) + } +} + +impl<In, Out> Codec for LengthDelimitedCodec<In, Out> +where + In: Serialize + Debug, + Out: DeserializeOwned + Debug, +{ + type In = In; + type Out = Out; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> { + let n = match self.state { + State::Length => { + match Self::decode_length(buf) { + Some(n) => { + assert!( + n <= MAX_MESSAGE_LEN, + "assertion failed: {} <= {}", + n, + MAX_MESSAGE_LEN + ); + self.state = State::Data(n); + + // Ensure that the buffer has enough space to read the + // incoming payload + buf.reserve(n.try_into().unwrap()); + + n + } + None => return Ok(None), + } + } + State::Data(n) => n, + }; + + match Self::decode_data(buf, n)? { + Some(data) => { + // Update the decode state + self.state = State::Length; + + // Make sure the buffer has enough space to read the next length header. + buf.reserve(HEADER_LEN); + + Ok(Some(data)) + } + None => Ok(None), + } + } + + fn encode(&mut self, item: Self::In, buf: &mut BytesMut) -> io::Result<()> { + trace!("Attempting to encode"); + + self.encode_buf.clear(); + if let Err(e) = bincode::options() + .with_limit(MAX_MESSAGE_LEN as u64) + .serialize_into::<_, Self::In>(&mut self.encode_buf, &item) + { + trace!("message encode failed: {:?}", *e); + match *e { + bincode::ErrorKind::Io(e) => return Err(e), + _ => return Err(io::Error::new(io::ErrorKind::Other, *e)), + } + } + + let encoded_len = self.encode_buf.len(); + assert!(encoded_len <= MAX_MESSAGE_LEN as usize); + buf.reserve(encoded_len + HEADER_LEN); + buf.put_u64_le(MAGIC); + buf.put_u32_le(encoded_len.try_into().unwrap()); + buf.extend_from_slice(&self.encode_buf); + + Ok(()) + } +} diff --git a/third_party/rust/audioipc2/src/errors.rs b/third_party/rust/audioipc2/src/errors.rs new file mode 100644 index 0000000000..f4f3e32f5f --- /dev/null +++ b/third_party/rust/audioipc2/src/errors.rs @@ -0,0 +1,18 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +error_chain! { + // Maybe replace with chain_err to improve the error info. + foreign_links { + Bincode(bincode::Error); + Io(std::io::Error); + Cubeb(cubeb::Error); + } + + // Replace bail!(str) with explicit errors. + errors { + Disconnected + } +} diff --git a/third_party/rust/audioipc2/src/ipccore.rs b/third_party/rust/audioipc2/src/ipccore.rs new file mode 100644 index 0000000000..16468b9015 --- /dev/null +++ b/third_party/rust/audioipc2/src/ipccore.rs @@ -0,0 +1,918 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::io::{self, Result}; +use std::sync::{mpsc, Arc}; +use std::thread; + +use mio::{event::Event, Events, Interest, Poll, Registry, Token, Waker}; +use slab::Slab; + +use crate::messages::AssociateHandleForMessage; +use crate::rpccore::{ + make_client, make_server, Client, Handler, Proxy, RequestQueue, RequestQueueSender, Server, +}; +use crate::{ + codec::Codec, + codec::LengthDelimitedCodec, + sys::{self, RecvMsg, SendMsg}, +}; + +use serde::{de::DeserializeOwned, Serialize}; +use std::fmt::Debug; + +const WAKE_TOKEN: Token = Token(!0); + +thread_local!(static IN_EVENTLOOP: std::cell::RefCell<Option<thread::ThreadId>> = std::cell::RefCell::new(None)); + +fn assert_not_in_event_loop_thread() { + IN_EVENTLOOP.with(|b| { + assert_ne!(*b.borrow(), Some(thread::current().id())); + }); +} + +// Requests sent by an EventLoopHandle to be serviced by +// the handle's associated EventLoop. +enum Request { + // See EventLoop::add_connection + AddConnection( + sys::Pipe, + Box<dyn Driver + Send>, + mpsc::Sender<Result<Token>>, + ), + // See EventLoop::shutdown + Shutdown, + // See EventLoop::wake_connection + WakeConnection(Token), +} + +// EventLoopHandle is a cloneable external reference +// to a running EventLoop, allowing registration of +// new client and server connections, in addition to +// requesting the EventLoop shut down cleanly. +#[derive(Clone, Debug)] +pub struct EventLoopHandle { + waker: Arc<Waker>, + requests: RequestQueueSender<Request>, +} + +impl EventLoopHandle { + pub fn bind_client<C: Client + 'static>( + &self, + connection: sys::Pipe, + ) -> Result<Proxy<<C as Client>::ServerMessage, <C as Client>::ClientMessage>> + where + <C as Client>::ServerMessage: Serialize + Debug + AssociateHandleForMessage + Send, + <C as Client>::ClientMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send, + { + let (handler, mut proxy) = make_client::<C>()?; + let driver = Box::new(FramedDriver::new(handler)); + let r = self.add_connection(connection, driver); + trace!("EventLoop::bind_client {:?}", r); + r.map(|token| { + proxy.connect_event_loop(self.clone(), token); + proxy + }) + } + + pub fn bind_server<S: Server + Send + 'static>( + &self, + server: S, + connection: sys::Pipe, + ) -> Result<()> + where + <S as Server>::ServerMessage: DeserializeOwned + Debug + AssociateHandleForMessage + Send, + <S as Server>::ClientMessage: Serialize + Debug + AssociateHandleForMessage + Send, + { + let handler = make_server::<S>(server); + let driver = Box::new(FramedDriver::new(handler)); + let r = self.add_connection(connection, driver); + trace!("EventLoop::bind_server {:?}", r); + r.map(|_| ()) + } + + // Register a new connection with associated driver on the EventLoop. + // TODO: Since this is called from a Gecko main thread, make this non-blocking wrt. the EventLoop. + fn add_connection( + &self, + connection: sys::Pipe, + driver: Box<dyn Driver + Send>, + ) -> Result<Token> { + assert_not_in_event_loop_thread(); + let (tx, rx) = mpsc::channel(); + self.requests + .push(Request::AddConnection(connection, driver, tx)) + .map_err(|_| { + debug!("EventLoopHandle::add_connection send failed"); + io::ErrorKind::ConnectionAborted + })?; + self.waker.wake()?; + rx.recv().map_err(|_| { + debug!("EventLoopHandle::add_connection recv failed"); + io::ErrorKind::ConnectionAborted + })? + } + + // Signal EventLoop to shutdown. Causes EventLoop::poll to return Ok(false). + fn shutdown(&self) -> Result<()> { + self.requests.push(Request::Shutdown).map_err(|_| { + debug!("EventLoopHandle::shutdown send failed"); + io::ErrorKind::ConnectionAborted + })?; + self.waker.wake() + } + + // Signal EventLoop to wake connection specified by `token` for processing. + pub(crate) fn wake_connection(&self, token: Token) { + if self.requests.push(Request::WakeConnection(token)).is_ok() { + self.waker.wake().expect("wake failed"); + } + } +} + +// EventLoop owns all registered connections, and is responsible for calling each connection's +// `handle_event` or `handle_wake` any time a readiness or wake event associated with that connection is +// produced. +struct EventLoop { + poll: Poll, + events: Events, + waker: Arc<Waker>, + name: String, + connections: Slab<Connection>, + requests: Arc<RequestQueue<Request>>, +} + +const EVENT_LOOP_INITIAL_CLIENTS: usize = 64; // Initial client allocation, exceeding this will cause the connection slab to grow. +const EVENT_LOOP_EVENTS_PER_ITERATION: usize = 256; // Number of events per poll() step, arbitrary limit. + +impl EventLoop { + fn new(name: String) -> Result<EventLoop> { + let poll = Poll::new()?; + let waker = Arc::new(Waker::new(poll.registry(), WAKE_TOKEN)?); + let eventloop = EventLoop { + poll, + events: Events::with_capacity(EVENT_LOOP_EVENTS_PER_ITERATION), + waker, + name, + connections: Slab::with_capacity(EVENT_LOOP_INITIAL_CLIENTS), + requests: Arc::new(RequestQueue::new(EVENT_LOOP_INITIAL_CLIENTS)), + }; + + Ok(eventloop) + } + + // Return a cloneable handle for controlling the EventLoop externally. + fn handle(&mut self) -> EventLoopHandle { + EventLoopHandle { + waker: self.waker.clone(), + requests: self.requests.new_sender(), + } + } + + // Register a connection and driver. + fn add_connection( + &mut self, + connection: sys::Pipe, + driver: Box<dyn Driver + Send>, + ) -> Result<Token> { + if self.connections.len() == self.connections.capacity() { + trace!("{}: connection slab full, insert will allocate", self.name); + } + let entry = self.connections.vacant_entry(); + let token = Token(entry.key()); + assert_ne!(token, WAKE_TOKEN); + let connection = Connection::new(connection, token, driver, self.poll.registry())?; + debug!("{}: {:?}: new connection", self.name, token); + entry.insert(connection); + Ok(token) + } + + // Step EventLoop once. Call this in a loop from a dedicated thread. + // Returns false if EventLoop is shutting down. + // Each step may call `handle_event` on any registered connection that + // has received readiness events from the poll wakeup. + fn poll(&mut self) -> Result<bool> { + loop { + let r = self.poll.poll(&mut self.events, None); + match r { + Ok(()) => break, + Err(ref e) if interrupted(e) => continue, + Err(e) => return Err(e), + } + } + + for event in self.events.iter() { + match event.token() { + WAKE_TOKEN => { + debug!("{}: WAKE: wake event, will process requests", self.name); + } + token => { + debug!("{}: {:?}: connection event: {:?}", self.name, token, event); + let done = if let Some(connection) = self.connections.get_mut(token.0) { + match connection.handle_event(event, self.poll.registry()) { + Ok(done) => { + debug!("{}: connection {:?} done={}", self.name, token, done); + done + } + Err(e) => { + debug!("{}: {:?}: connection error: {:?}", self.name, token, e); + true + } + } + } else { + // Spurious event, log and ignore. + debug!( + "{}: {:?}: token not found in slab: {:?}", + self.name, token, event + ); + false + }; + if done { + debug!("{}: {:?}: done, removing", self.name, token); + let mut connection = self.connections.remove(token.0); + if let Err(e) = connection.shutdown(self.poll.registry()) { + debug!( + "{}: EventLoop drop - closing connection for {:?} failed: {:?}", + self.name, token, e + ); + } + } + } + } + } + + // If the waker was signalled there may be pending requests to process. + while let Some(req) = self.requests.pop() { + match req { + Request::AddConnection(pipe, driver, tx) => { + debug!("{}: EventLoop: handling add_connection", self.name); + let r = self.add_connection(pipe, driver); + tx.send(r).expect("EventLoop::add_connection"); + } + Request::Shutdown => { + debug!("{}: EventLoop: handling shutdown", self.name); + return Ok(false); + } + Request::WakeConnection(token) => { + debug!( + "{}: EventLoop: handling wake_connection {:?}", + self.name, token + ); + let done = if let Some(connection) = self.connections.get_mut(token.0) { + match connection.handle_wake(self.poll.registry()) { + Ok(done) => done, + Err(e) => { + debug!("{}: {:?}: connection error: {:?}", self.name, token, e); + true + } + } + } else { + // Spurious wake, log and ignore. + debug!( + "{}: {:?}: token not found in slab: wake_connection", + self.name, token + ); + false + }; + if done { + debug!("{}: {:?}: done (wake), removing", self.name, token); + let mut connection = self.connections.remove(token.0); + if let Err(e) = connection.shutdown(self.poll.registry()) { + debug!( + "{}: EventLoop drop - closing connection for {:?} failed: {:?}", + self.name, token, e + ); + } + } + } + } + } + + Ok(true) + } +} + +impl Drop for EventLoop { + fn drop(&mut self) { + debug!("{}: EventLoop drop", self.name); + for (token, connection) in &mut self.connections { + debug!( + "{}: EventLoop drop - closing connection for {:?}", + self.name, token + ); + if let Err(e) = connection.shutdown(self.poll.registry()) { + debug!( + "{}: EventLoop drop - closing connection for {:?} failed: {:?}", + self.name, token, e + ); + } + } + debug!("{}: EventLoop drop done", self.name); + } +} + +// Connection wraps an interprocess connection (Pipe) and manages +// receiving inbound and sending outbound buffers (and associated handles, if any). +// The associated driver is responsible for message framing and serialization. +struct Connection { + io: sys::Pipe, + token: Token, + interest: Option<Interest>, + inbound: sys::ConnectionBuffer, + outbound: sys::ConnectionBuffer, + driver: Box<dyn Driver + Send>, +} + +pub(crate) const IPC_CLIENT_BUFFER_SIZE: usize = 16384; + +impl Connection { + fn new( + mut io: sys::Pipe, + token: Token, + driver: Box<dyn Driver + Send>, + registry: &Registry, + ) -> Result<Connection> { + let interest = Interest::READABLE; + registry.register(&mut io, token, interest)?; + Ok(Connection { + io, + token, + interest: Some(interest), + inbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE), + outbound: sys::ConnectionBuffer::with_capacity(IPC_CLIENT_BUFFER_SIZE), + driver, + }) + } + + fn shutdown(&mut self, registry: &Registry) -> Result<()> { + trace!( + "{:?}: connection shutdown interest={:?}", + self.token, + self.interest + ); + let r = self.io.shutdown(); + trace!("{:?}: connection shutdown r={:?}", self.token, r); + self.interest = None; + registry.deregister(&mut self.io) + } + + // Connections are always interested in READABLE. clear_readable is only + // called when the connection is in the process of shutting down. + fn clear_readable(&mut self, registry: &Registry) -> Result<()> { + self.update_registration( + registry, + self.interest.and_then(|i| i.remove(Interest::READABLE)), + ) + } + + // Connections toggle WRITABLE based on the state of the `outbound` buffer. + fn set_writable(&mut self, registry: &Registry) -> Result<()> { + self.update_registration( + registry, + Some( + self.interest + .map_or_else(|| Interest::WRITABLE, |i| i.add(Interest::WRITABLE)), + ), + ) + } + + fn clear_writable(&mut self, registry: &Registry) -> Result<()> { + self.update_registration( + registry, + self.interest.and_then(|i| i.remove(Interest::WRITABLE)), + ) + } + + // Update connection registration with the current readiness event interests. + fn update_registration( + &mut self, + registry: &Registry, + new_interest: Option<Interest>, + ) -> Result<()> { + // Note: Updating registration always triggers a writable event with NamedPipes, so + // it's important to skip updating registration when the set of interests hasn't changed. + if new_interest != self.interest { + trace!( + "{:?}: updating readiness registration old={:?} new={:?}", + self.token, + self.interest, + new_interest + ); + self.interest = new_interest; + if let Some(interest) = self.interest { + registry.reregister(&mut self.io, self.token, interest)?; + } else { + registry.deregister(&mut self.io)?; + } + } + Ok(()) + } + + // Handle readiness event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list. + // The EventLoop will call this for any connection that has received an event. + fn handle_event(&mut self, event: &Event, registry: &Registry) -> Result<bool> { + debug!("{:?}: handling event {:?}", self.token, event); + assert_eq!(self.token, event.token()); + let done = if event.is_readable() { + self.recv_inbound()? + } else { + trace!("{:?}: not readable", self.token); + false + }; + self.flush_outbound()?; + if self.send_outbound(registry)? { + // Hit EOF during send + return Ok(true); + } + debug!( + "{:?}: handling event done (recv done={}, outbound={})", + self.token, + done, + self.outbound.is_empty() + ); + let done = done && self.outbound.is_empty(); + // If driver is done and outbound is clear, unregister connection. + if done { + trace!("{:?}: driver done, clearing read interest", self.token); + self.clear_readable(registry)?; + } + Ok(done) + } + + // Handle wake event. Errors returned are fatal for this connection, resulting in removal from the EventLoop connection list. + // The EventLoop will call this to clear the outbound buffer for any connection that has received a wake event. + fn handle_wake(&mut self, registry: &Registry) -> Result<bool> { + debug!("{:?}: handling wake", self.token); + self.flush_outbound()?; + if self.send_outbound(registry)? { + // Hit EOF during send + return Ok(true); + } + debug!("{:?}: handling wake done", self.token); + Ok(false) + } + + fn recv_inbound(&mut self) -> Result<bool> { + // If the connection is readable, read into inbound and pass to driver for processing until all ready data + // has been consumed. + loop { + trace!("{:?}: pre-recv inbound: {:?}", self.token, self.inbound); + let r = self.io.recv_msg(&mut self.inbound); + match r { + Ok(0) => { + trace!( + "{:?}: recv EOF unprocessed inbound={}", + self.token, + self.inbound.is_empty() + ); + return Ok(true); + } + Ok(n) => { + trace!("{:?}: recv bytes: {}, process_inbound", self.token, n); + let r = self.driver.process_inbound(&mut self.inbound); + trace!("{:?}: process_inbound done: {:?}", self.token, r); + match r { + Ok(done) => { + if done { + return Ok(true); + } + } + Err(e) => { + debug!( + "{:?}: process_inbound error: {:?} unprocessed inbound={}", + self.token, + e, + self.inbound.is_empty() + ); + return Err(e); + } + } + } + Err(ref e) if would_block(e) => { + trace!("{:?}: recv would_block: {:?}", self.token, e); + return Ok(false); + } + Err(ref e) if interrupted(e) => { + trace!("{:?}: recv interrupted: {:?}", self.token, e); + continue; + } + Err(e) => { + debug!("{:?}: recv error: {:?}", self.token, e); + return Err(e); + } + } + } + } + + fn flush_outbound(&mut self) -> Result<()> { + // Enqueue outbound messages to the outbound buffer, then try to write out to connection. + // There may be outbound messages even if there was no inbound processing, so always attempt + // to enqueue and flush. + trace!("{:?}: flush_outbound", self.token); + let r = self.driver.flush_outbound(&mut self.outbound); + trace!("{:?}: flush_outbound done: {:?}", self.token, r); + if let Err(e) = r { + debug!("{:?}: flush_outbound error: {:?}", self.token, e); + return Err(e); + } + Ok(()) + } + + fn send_outbound(&mut self, registry: &Registry) -> Result<bool> { + // Attempt to flush outbound buffer. If the connection's write buffer is full, register for WRITABLE + // and complete flushing when associated notitication arrives later. + while !self.outbound.is_empty() { + let r = self.io.send_msg(&mut self.outbound); + match r { + Ok(0) => { + trace!("{:?}: send EOF", self.token); + return Ok(true); + } + Ok(n) => { + trace!("{:?}: send bytes: {}", self.token, n); + } + Err(ref e) if would_block(e) => { + trace!( + "{:?}: send would_block: {:?}, setting write interest", + self.token, + e + ); + // Register for write events. + self.set_writable(registry)?; + break; + } + Err(ref e) if interrupted(e) => { + trace!("{:?}: send interrupted: {:?}", self.token, e); + continue; + } + Err(e) => { + debug!("{:?}: send error: {:?}", self.token, e); + return Err(e); + } + } + trace!("{:?}: post-send: outbound {:?}", self.token, self.outbound); + } + // Outbound buffer flushed, clear registration for WRITABLE. + if self.outbound.is_empty() { + trace!("{:?}: outbound empty, clearing write interest", self.token); + self.clear_writable(registry)?; + } + Ok(false) + } +} + +impl Drop for Connection { + fn drop(&mut self) { + debug!("{:?}: Connection drop", self.token); + } +} + +fn would_block(err: &std::io::Error) -> bool { + err.kind() == std::io::ErrorKind::WouldBlock +} + +fn interrupted(err: &std::io::Error) -> bool { + err.kind() == std::io::ErrorKind::Interrupted +} + +// Driver only has a single implementation, but must be hidden behind a Trait object to +// hide the varying FramedDriver sizes (due to different `T` values). +trait Driver { + // Handle inbound messages. Returns true if Driver is done; this will trigger Connection removal and cleanup. + fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool>; + + // Write outbound messages to `outbound`. + fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()>; +} + +// Length-delimited connection framing and (de)serialization is handled by the inbound and outbound processing. +// Handlers can then process message Requests and Responses without knowledge of serialization or +// handle remoting. +impl<T> Driver for FramedDriver<T> +where + T: Handler, + T::In: DeserializeOwned + Debug + AssociateHandleForMessage, + T::Out: Serialize + Debug + AssociateHandleForMessage, +{ + // Caller passes `inbound` data, this function will trim any complete messages from `inbound` and pass them to the handler for processing. + fn process_inbound(&mut self, inbound: &mut sys::ConnectionBuffer) -> Result<bool> { + debug!("process_inbound: {:?}", inbound); + + // Repeatedly call `decode` as long as it produces items, passing each produced item to the handler to action. + #[allow(unused_mut)] + while let Some(mut item) = self.codec.decode(&mut inbound.buf)? { + if item.has_associated_handle() { + // On Unix, dequeue a handle from the connection and update the item's handle. + #[cfg(unix)] + { + let new = inbound + .pop_handle() + .expect("inbound handle expected for item"); + unsafe { item.set_local_handle(new.take()) }; + } + // On Windows, the deserialized item contains the correct handle value, so + // convert it to an owned handle on the item. + #[cfg(windows)] + { + assert!(inbound.pop_handle().is_none()); + unsafe { item.set_local_handle() }; + } + } + + self.handler.consume(item)?; + } + + Ok(false) + } + + // Caller will try to write `outbound` to associated connection, queuing any data that can't be transmitted immediately. + fn flush_outbound(&mut self, outbound: &mut sys::ConnectionBuffer) -> Result<()> { + debug!("flush_outbound: {:?}", outbound.buf); + + // Repeatedly grab outgoing items from the handler, passing each to `encode` for serialization into `outbound`. + while let Some(mut item) = self.handler.produce()? { + let handle = if item.has_associated_handle() { + #[allow(unused_mut)] + let mut handle = item.take_handle(); + // On Windows, the handle is transferred by duplicating it into the target remote process. + #[cfg(windows)] + unsafe { + item.set_remote_handle(handle.send_to_target()?); + } + Some(handle) + } else { + None + }; + + self.codec.encode(item, &mut outbound.buf)?; + if let Some(handle) = handle { + // `outbound` retains ownership of the handle until the associated + // encoded item in `outbound.buf` is sent to the remote process. + outbound.push_handle(handle); + } + } + Ok(()) + } +} + +struct FramedDriver<T: Handler> { + codec: LengthDelimitedCodec<T::Out, T::In>, + handler: T, +} + +impl<T: Handler> FramedDriver<T> { + fn new(handler: T) -> FramedDriver<T> { + FramedDriver { + codec: Default::default(), + handler, + } + } +} + +#[derive(Debug)] +pub struct EventLoopThread { + thread: Option<thread::JoinHandle<Result<()>>>, + name: String, + handle: EventLoopHandle, +} + +impl EventLoopThread { + pub fn new<F1, F2>( + name: String, + stack_size: Option<usize>, + after_start: F1, + before_stop: F2, + ) -> Result<Self> + where + F1: Fn() + Send + 'static, + F2: Fn() + Send + 'static, + { + let mut event_loop = EventLoop::new(name.clone())?; + let handle = event_loop.handle(); + + let builder = thread::Builder::new() + .name(name.clone()) + .stack_size(stack_size.unwrap_or(64 * 4096)); + + let thread = builder.spawn(move || { + trace!("{}: event loop thread enter", event_loop.name); + after_start(); + let _thread_exit_guard = scopeguard::guard((), |_| before_stop()); + + let r = loop { + let start = std::time::Instant::now(); + let r = event_loop.poll(); + trace!( + "{}: event loop poll r={:?}, took={}μs", + event_loop.name, + r, + start.elapsed().as_micros() + ); + match r { + Ok(true) => continue, + _ => break r, + } + }; + + trace!("{}: event loop thread exit", event_loop.name); + r.map(|_| ()) + })?; + + Ok(EventLoopThread { + thread: Some(thread), + name, + handle, + }) + } + + pub fn handle(&self) -> &EventLoopHandle { + &self.handle + } +} + +impl Drop for EventLoopThread { + // Shut down event loop and executor thread. Blocks until complete. + fn drop(&mut self) { + trace!("{}: EventLoopThread shutdown", self.name); + if let Err(e) = self.handle.shutdown() { + debug!("{}: initiating shutdown failed: {:?}", self.name, e); + } + let thread = self.thread.take().expect("event loop thread"); + if let Err(e) = thread.join() { + error!("{}: EventLoopThread failed: {:?}", self.name, e); + } + trace!("{}: EventLoopThread shutdown done", self.name); + } +} + +#[cfg(test)] +mod test { + use serde_derive::{Deserialize, Serialize}; + + use super::*; + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + enum TestServerMessage { + TestRequest, + } + impl AssociateHandleForMessage for TestServerMessage {} + + struct TestServerImpl {} + + impl Server for TestServerImpl { + type ServerMessage = TestServerMessage; + type ClientMessage = TestClientMessage; + + fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage { + assert_eq!(req, TestServerMessage::TestRequest); + TestClientMessage::TestResponse + } + } + + #[derive(Debug, Serialize, Deserialize, PartialEq)] + enum TestClientMessage { + TestResponse, + } + + impl AssociateHandleForMessage for TestClientMessage {} + + struct TestClientImpl {} + + impl Client for TestClientImpl { + type ServerMessage = TestServerMessage; + type ClientMessage = TestClientMessage; + } + + fn init() { + let _ = env_logger::builder().is_test(true).try_init(); + } + + fn setup() -> ( + EventLoopThread, + EventLoopThread, + Proxy<TestServerMessage, TestClientMessage>, + ) { + // Server setup and registration. + let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {}) + .expect("server EventLoopThread"); + let server_handle = server.handle(); + + let (server_pipe, client_pipe) = sys::make_pipe_pair().expect("server make_pipe_pair"); + server_handle + .bind_server(TestServerImpl {}, server_pipe) + .expect("server bind_server"); + + // Client setup and registration. + let client = EventLoopThread::new("test-client".to_string(), None, || {}, || {}) + .expect("client EventLoopThread"); + let client_handle = client.handle(); + + let client_pipe = unsafe { sys::Pipe::from_raw_handle(client_pipe) }; + let client_proxy = client_handle + .bind_client::<TestClientImpl>(client_pipe) + .expect("client bind_client"); + + (server, client, client_proxy) + } + + // Verify basic EventLoopThread functionality works. Create a server and client EventLoopThread, then send + // a single message from the client to the server and wait for the expected response. + #[test] + fn basic() { + init(); + let (server, client, client_proxy) = setup(); + + // RPC message from client to server. + let response = client_proxy.call(TestServerMessage::TestRequest); + let response = response.expect("client response"); + assert_eq!(response, TestClientMessage::TestResponse); + + // Explicit shutdown. + drop(client); + drop(server); + } + + // Same as `basic`, but shut down server before client. + #[test] + fn basic_reverse_drop_order() { + init(); + let (server, client, client_proxy) = setup(); + + // RPC message from client to server. + let response = client_proxy.call(TestServerMessage::TestRequest); + let response = response.expect("client response"); + assert_eq!(response, TestClientMessage::TestResponse); + + // Explicit shutdown. + drop(server); + drop(client); + } + + #[test] + fn dead_server() { + init(); + let (server, _client, client_proxy) = setup(); + drop(server); + + let response = client_proxy.call(TestServerMessage::TestRequest); + response.expect_err("sending on closed channel"); + } + + #[test] + fn dead_client() { + init(); + let (_server, client, client_proxy) = setup(); + drop(client); + + let response = client_proxy.call(TestServerMessage::TestRequest); + response.expect_err("sending on a closed channel"); + } + + #[test] + fn disconnected_handle() { + init(); + let server = EventLoopThread::new("test-server".to_string(), None, || {}, || {}) + .expect("server EventLoopThread"); + let server_handle = server.handle().clone(); + drop(server); + + server_handle + .shutdown() + .expect_err("sending on closed channel"); + } + + #[test] + fn clone_after_drop() { + init(); + let (server, client, client_proxy) = setup(); + drop(server); + drop(client); + + let clone = client_proxy.clone(); + let response = clone.call(TestServerMessage::TestRequest); + response.expect_err("sending to a dropped ClientHandler"); + } + + #[test] + fn basic_event_loop_thread_callbacks() { + init(); + let (start_tx, start_rx) = mpsc::channel(); + let (stop_tx, stop_rx) = mpsc::channel(); + + let elt = EventLoopThread::new( + "test-thread-callbacks".to_string(), + None, + move || start_tx.send(()).unwrap(), + move || stop_tx.send(()).unwrap(), + ) + .expect("server EventLoopThread"); + + start_rx.recv().expect("after_start callback done"); + + drop(elt); + + stop_rx.recv().expect("before_stop callback done"); + } +} diff --git a/third_party/rust/audioipc2/src/lib.rs b/third_party/rust/audioipc2/src/lib.rs new file mode 100644 index 0000000000..eb6d843ba4 --- /dev/null +++ b/third_party/rust/audioipc2/src/lib.rs @@ -0,0 +1,214 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +#![warn(unused_extern_crates)] +#![recursion_limit = "1024"] +#[macro_use] +extern crate error_chain; +#[macro_use] +extern crate log; + +pub mod codec; +#[allow(deprecated)] +pub mod errors; +pub mod messages; +pub mod shm; + +pub mod ipccore; +pub mod rpccore; +pub mod sys; + +pub use crate::messages::{ClientMessage, ServerMessage}; + +#[cfg(unix)] +use std::os::unix::io::IntoRawFd; +#[cfg(windows)] +use std::os::windows::io::IntoRawHandle; + +use std::io::Result; + +// This must match the definition of +// ipc::FileDescriptor::PlatformHandleType in Gecko. +#[cfg(windows)] +pub type PlatformHandleType = std::os::windows::raw::HANDLE; +#[cfg(unix)] +pub type PlatformHandleType = libc::c_int; + +// This stands in for RawFd/RawHandle. +#[derive(Debug)] +pub struct PlatformHandle(PlatformHandleType); + +#[cfg(unix)] +pub const INVALID_HANDLE_VALUE: PlatformHandleType = -1isize as PlatformHandleType; + +#[cfg(windows)] +pub const INVALID_HANDLE_VALUE: PlatformHandleType = winapi::um::handleapi::INVALID_HANDLE_VALUE; + +#[cfg(unix)] +fn valid_handle(handle: PlatformHandleType) -> bool { + handle >= 0 +} + +#[cfg(windows)] +fn valid_handle(handle: PlatformHandleType) -> bool { + handle != INVALID_HANDLE_VALUE && !handle.is_null() +} + +impl PlatformHandle { + pub fn new(raw: PlatformHandleType) -> PlatformHandle { + assert!(valid_handle(raw)); + PlatformHandle(raw) + } + + #[cfg(windows)] + pub fn from<T: IntoRawHandle>(from: T) -> PlatformHandle { + PlatformHandle::new(from.into_raw_handle()) + } + + #[cfg(unix)] + pub fn from<T: IntoRawFd>(from: T) -> PlatformHandle { + PlatformHandle::new(from.into_raw_fd()) + } + + #[allow(clippy::missing_safety_doc)] + pub unsafe fn into_raw(self) -> PlatformHandleType { + let handle = self.0; + std::mem::forget(self); + handle + } + + #[cfg(unix)] + pub fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> { + unsafe { + let newfd = libc::dup(h); + if !valid_handle(newfd) { + return Err(std::io::Error::last_os_error()); + } + Ok(PlatformHandle::from(newfd)) + } + } + + #[allow(clippy::missing_safety_doc)] + #[cfg(windows)] + pub unsafe fn duplicate(h: PlatformHandleType) -> Result<PlatformHandle> { + let dup = duplicate_platform_handle(h, None)?; + Ok(PlatformHandle::new(dup)) + } +} + +impl Drop for PlatformHandle { + fn drop(&mut self) { + unsafe { close_platform_handle(self.0) } + } +} + +#[cfg(unix)] +unsafe fn close_platform_handle(handle: PlatformHandleType) { + libc::close(handle); +} + +#[cfg(windows)] +unsafe fn close_platform_handle(handle: PlatformHandleType) { + winapi::um::handleapi::CloseHandle(handle); +} + +#[cfg(windows)] +use winapi::shared::minwindef::{DWORD, FALSE}; +#[cfg(windows)] +use winapi::um::{handleapi, processthreadsapi, winnt}; + +// Duplicate `source_handle` to `target_pid`. Returns the value of the new handle inside the target process. +// If `target_pid` is `None`, `source_handle` is duplicated in the current process. +#[cfg(windows)] +pub(crate) unsafe fn duplicate_platform_handle( + source_handle: PlatformHandleType, + target_pid: Option<DWORD>, +) -> Result<PlatformHandleType> { + let source_process = processthreadsapi::GetCurrentProcess(); + let target_process = if let Some(pid) = target_pid { + let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, pid); + if !valid_handle(target) { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid target process", + )); + } + Some(target) + } else { + None + }; + + let mut target_handle = std::ptr::null_mut(); + let ok = handleapi::DuplicateHandle( + source_process, + source_handle, + target_process.unwrap_or(source_process), + &mut target_handle, + 0, + FALSE, + winnt::DUPLICATE_SAME_ACCESS, + ); + if let Some(target) = target_process { + handleapi::CloseHandle(target); + }; + if ok == FALSE { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "DuplicateHandle failed", + )); + } + Ok(target_handle) +} + +// Close `target_handle_to_close` inside target process `target_pid` using a +// special invocation of `DuplicateHandle`. See +// https://docs.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-duplicatehandle#:~:text=Normally%20the%20target,dwOptions%20to%20DUPLICATE_CLOSE_SOURCE. +#[cfg(windows)] +pub(crate) unsafe fn close_target_handle( + target_handle_to_close: PlatformHandleType, + target_pid: DWORD, +) -> Result<()> { + let target_process = + processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid); + if !valid_handle(target_process) { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid target process", + )); + } + + let ok = handleapi::DuplicateHandle( + target_process, + target_handle_to_close, + std::ptr::null_mut(), + std::ptr::null_mut(), + 0, + FALSE, + winnt::DUPLICATE_CLOSE_SOURCE, + ); + handleapi::CloseHandle(target_process); + if ok == FALSE { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "DuplicateHandle failed", + )); + } + Ok(()) +} + +#[cfg(windows)] +pub fn server_platform_init() { + use winapi::shared::winerror; + use winapi::um::combaseapi; + use winapi::um::objbase; + + unsafe { + let r = combaseapi::CoInitializeEx(std::ptr::null_mut(), objbase::COINIT_MULTITHREADED); + assert!(winerror::SUCCEEDED(r)); + } +} + +#[cfg(unix)] +pub fn server_platform_init() {} diff --git a/third_party/rust/audioipc2/src/messages.rs b/third_party/rust/audioipc2/src/messages.rs new file mode 100644 index 0000000000..853f056bae --- /dev/null +++ b/third_party/rust/audioipc2/src/messages.rs @@ -0,0 +1,632 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::PlatformHandle; +use crate::PlatformHandleType; +use crate::INVALID_HANDLE_VALUE; +#[cfg(target_os = "linux")] +use audio_thread_priority::RtPriorityThreadInfo; +use cubeb::{self, ffi}; +use serde_derive::Deserialize; +use serde_derive::Serialize; +use std::ffi::{CStr, CString}; +use std::os::raw::{c_char, c_int, c_uint}; +use std::ptr; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Device { + #[serde(with = "serde_bytes")] + pub output_name: Option<Vec<u8>>, + #[serde(with = "serde_bytes")] + pub input_name: Option<Vec<u8>>, +} + +impl<'a> From<&'a cubeb::DeviceRef> for Device { + fn from(info: &'a cubeb::DeviceRef) -> Self { + Self { + output_name: info.output_name_bytes().map(|s| s.to_vec()), + input_name: info.input_name_bytes().map(|s| s.to_vec()), + } + } +} + +impl From<ffi::cubeb_device> for Device { + fn from(info: ffi::cubeb_device) -> Self { + Self { + output_name: dup_str(info.output_name), + input_name: dup_str(info.input_name), + } + } +} + +impl From<Device> for ffi::cubeb_device { + fn from(info: Device) -> Self { + Self { + output_name: opt_str(info.output_name), + input_name: opt_str(info.input_name), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DeviceInfo { + pub devid: usize, + #[serde(with = "serde_bytes")] + pub device_id: Option<Vec<u8>>, + #[serde(with = "serde_bytes")] + pub friendly_name: Option<Vec<u8>>, + #[serde(with = "serde_bytes")] + pub group_id: Option<Vec<u8>>, + #[serde(with = "serde_bytes")] + pub vendor_name: Option<Vec<u8>>, + + pub device_type: ffi::cubeb_device_type, + pub state: ffi::cubeb_device_state, + pub preferred: ffi::cubeb_device_pref, + + pub format: ffi::cubeb_device_fmt, + pub default_format: ffi::cubeb_device_fmt, + pub max_channels: u32, + pub default_rate: u32, + pub max_rate: u32, + pub min_rate: u32, + + pub latency_lo: u32, + pub latency_hi: u32, +} + +impl<'a> From<&'a cubeb::DeviceInfoRef> for DeviceInfo { + fn from(info: &'a cubeb::DeviceInfoRef) -> Self { + let info = unsafe { &*info.as_ptr() }; + DeviceInfo { + devid: info.devid as _, + device_id: dup_str(info.device_id), + friendly_name: dup_str(info.friendly_name), + group_id: dup_str(info.group_id), + vendor_name: dup_str(info.vendor_name), + + device_type: info.device_type, + state: info.state, + preferred: info.preferred, + + format: info.format, + default_format: info.default_format, + max_channels: info.max_channels, + default_rate: info.default_rate, + max_rate: info.max_rate, + min_rate: info.min_rate, + + latency_lo: info.latency_lo, + latency_hi: info.latency_hi, + } + } +} + +impl From<DeviceInfo> for ffi::cubeb_device_info { + fn from(info: DeviceInfo) -> Self { + ffi::cubeb_device_info { + devid: info.devid as _, + device_id: opt_str(info.device_id), + friendly_name: opt_str(info.friendly_name), + group_id: opt_str(info.group_id), + vendor_name: opt_str(info.vendor_name), + + device_type: info.device_type, + state: info.state, + preferred: info.preferred, + + format: info.format, + default_format: info.default_format, + max_channels: info.max_channels, + default_rate: info.default_rate, + max_rate: info.max_rate, + min_rate: info.min_rate, + + latency_lo: info.latency_lo, + latency_hi: info.latency_hi, + } + } +} + +#[repr(C)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize)] +pub struct StreamParams { + pub format: ffi::cubeb_sample_format, + pub rate: c_uint, + pub channels: c_uint, + pub layout: ffi::cubeb_channel_layout, + pub prefs: ffi::cubeb_stream_prefs, +} + +impl From<&cubeb::StreamParamsRef> for StreamParams { + fn from(x: &cubeb::StreamParamsRef) -> StreamParams { + unsafe { *(x.as_ptr() as *mut StreamParams) } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamCreateParams { + pub input_stream_params: Option<StreamParams>, + pub output_stream_params: Option<StreamParams>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamInitParams { + #[serde(with = "serde_bytes")] + pub stream_name: Option<Vec<u8>>, + pub input_device: usize, + pub input_stream_params: Option<StreamParams>, + pub output_device: usize, + pub output_stream_params: Option<StreamParams>, + pub latency_frames: u32, +} + +fn dup_str(s: *const c_char) -> Option<Vec<u8>> { + if s.is_null() { + None + } else { + let vec: Vec<u8> = unsafe { CStr::from_ptr(s) }.to_bytes().to_vec(); + Some(vec) + } +} + +fn opt_str(v: Option<Vec<u8>>) -> *mut c_char { + match v { + Some(v) => match CString::new(v) { + Ok(s) => s.into_raw(), + Err(_) => { + debug!("Failed to convert bytes to CString"); + ptr::null_mut() + } + }, + None => ptr::null_mut(), + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamCreate { + pub token: usize, + pub shm_handle: SerializableHandle, + pub shm_area_size: usize, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RegisterDeviceCollectionChanged { + pub platform_handle: SerializableHandle, +} + +// Client -> Server messages. +// TODO: Callbacks should be different messages types so +// ServerConn::process_msg doesn't have a catch-all case. +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerMessage { + ClientConnect(u32), + ClientDisconnect, + + ContextGetBackendId, + ContextGetMaxChannelCount, + ContextGetMinLatency(StreamParams), + ContextGetPreferredSampleRate, + ContextGetDeviceEnumeration(ffi::cubeb_device_type), + ContextSetupDeviceCollectionCallback, + ContextRegisterDeviceCollectionChanged(ffi::cubeb_device_type, bool), + + StreamCreate(StreamCreateParams), + StreamInit(usize, StreamInitParams), + StreamDestroy(usize), + + StreamStart(usize), + StreamStop(usize), + StreamGetPosition(usize), + StreamGetLatency(usize), + StreamGetInputLatency(usize), + StreamSetVolume(usize, f32), + StreamSetName(usize, CString), + StreamGetCurrentDevice(usize), + StreamRegisterDeviceChangeCallback(usize, bool), + + #[cfg(target_os = "linux")] + PromoteThreadToRealTime([u8; std::mem::size_of::<RtPriorityThreadInfo>()]), +} + +// Server -> Client messages. +// TODO: Streams need id. +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientMessage { + ClientConnected, + ClientDisconnected, + + ContextBackendId(String), + ContextMaxChannelCount(u32), + ContextMinLatency(u32), + ContextPreferredSampleRate(u32), + ContextEnumeratedDevices(Vec<DeviceInfo>), + ContextSetupDeviceCollectionCallback(RegisterDeviceCollectionChanged), + ContextRegisteredDeviceCollectionChanged, + + StreamCreated(StreamCreate), + StreamInitialized(SerializableHandle), + StreamDestroyed, + + StreamStarted, + StreamStopped, + StreamPosition(u64), + StreamLatency(u32), + StreamInputLatency(u32), + StreamVolumeSet, + StreamNameSet, + StreamCurrentDevice(Device), + StreamRegisterDeviceChangeCallback, + + #[cfg(target_os = "linux")] + ThreadPromoted, + + Error(c_int), +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum CallbackReq { + Data { + nframes: isize, + input_frame_size: usize, + output_frame_size: usize, + }, + State(ffi::cubeb_state), + DeviceChange, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum CallbackResp { + Data(isize), + State, + DeviceChange, + Error(c_int), +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum DeviceCollectionReq { + DeviceChange(ffi::cubeb_device_type), +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum DeviceCollectionResp { + DeviceChange, +} + +// Represents a platform handle in various transitional states during serialization and remoting. +// The process of serializing and remoting handles and the ownership during various states differs +// between Windows and Unix. SerializableHandle changes during IPC as follows: +// +// 1. Created in the initial state `Owned`, with a valid `target_pid`. +// 2. Ownership is transferred out for processing during IPC send, becoming `Empty` temporarily. +// See `AssociateHandleForMessage::take_handle`. +// - Windows: DuplicateHandle transfers the handle to the remote process. +// This produces a new handle value in the local process representing the remote handle. +// This value must be sent to the remote, so `AssociateHandleForMessage::set_remote_handle` +// is used to transform the handle into a `SerializableValue`. +// - Unix: sendmsg transfers the handle to the remote process. The handle is left `Empty`. +// (Note: this occurs later, when the serialized message buffer is sent) +// 3. Message containing `SerializableValue` or `Empty` (depending on handle processing in step 2) +// is serialized and sent via IPC. +// 4. Message received and deserialized in target process. +// - Windows: `AssociateHandleForMessage::set_local_handle converts the received `SerializableValue` into `Owned`, ready for use. +// - Unix: Handle (with a new value in the target process) is received out-of-band via `recvmsg` +// and converted to `Owned` via `AssociateHandleForMessage::set_local_handle`. +#[derive(Debug)] +pub enum SerializableHandle { + // Owned handle, with optional target_pid on sending side. + Owned(PlatformHandle, Option<u32>), + // Transitional IPC states: + SerializableValue(PlatformHandleType), // Windows + Empty, // Unix +} + +// PlatformHandle is non-Send and contains a pointer (HANDLE) on Windows. +#[allow(clippy::non_send_fields_in_send_ty)] +unsafe impl Send for SerializableHandle {} + +impl SerializableHandle { + pub fn new(handle: PlatformHandle, target_pid: u32) -> SerializableHandle { + SerializableHandle::Owned(handle, Some(target_pid)) + } + + // Called on the receiving side to take ownership of the handle. + pub fn take_handle(&mut self) -> PlatformHandle { + match std::mem::replace(self, SerializableHandle::Empty) { + SerializableHandle::Owned(handle, target_pid) => { + assert!(target_pid.is_none()); + handle + } + _ => panic!("take_handle called in invalid state"), + } + } + + // Called on the sending side to take ownership of the handle for + // handling platform-specific remoting. + fn take_handle_for_send(&mut self) -> RemoteHandle { + match std::mem::replace(self, SerializableHandle::Empty) { + SerializableHandle::Owned(handle, target_pid) => unsafe { + RemoteHandle::new( + handle.into_raw(), + target_pid.expect("target process required"), + ) + }, + _ => panic!("take_handle_for_send called in invalid state"), + } + } + + fn new_owned(handle: PlatformHandleType) -> SerializableHandle { + SerializableHandle::Owned(PlatformHandle::new(handle), None) + } + + #[cfg(windows)] + fn make_owned(&mut self) { + if let SerializableHandle::SerializableValue(handle) = self { + *self = SerializableHandle::new_owned(*handle); + } else { + panic!("make_owned called in invalid state") + } + } + + fn new_serializable_value(handle: PlatformHandleType) -> SerializableHandle { + SerializableHandle::SerializableValue(handle) + } + + fn get_serializable_value(&self) -> PlatformHandleType { + match *self { + SerializableHandle::SerializableValue(handle) => handle, + SerializableHandle::Empty => INVALID_HANDLE_VALUE, + _ => panic!("get_remote_handle called in invalid state"), + } + } +} + +// Raw handle values are serialized as i64. Additional handling external to (de)serialization is required during IPC +// send/receive to convert these raw values into valid handles. +impl serde::Serialize for SerializableHandle { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let handle = self.get_serializable_value(); + serializer.serialize_i64(handle as i64) + } +} + +impl<'de> serde::Deserialize<'de> for SerializableHandle { + fn deserialize<D>(deserializer: D) -> Result<SerializableHandle, D::Error> + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_i64(SerializableHandleVisitor) + } +} + +struct SerializableHandleVisitor; +impl serde::de::Visitor<'_> for SerializableHandleVisitor { + type Value = SerializableHandle; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("an integer between -2^63 and 2^63") + } + + fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(SerializableHandle::new_serializable_value( + value as PlatformHandleType, + )) + } +} + +// Represents a PlatformHandle in-flight between processes. +// On Unix platforms, this is just a plain owned Handle, closed on drop. +// On Windows, `RemoteHandle` also retains ownership of the `target_handle` +// in the `target` process. Once the handle has been successfully sent +// to the remote, the sender should call `mark_sent()` to relinquish +// ownership of `target_handle` in the remote. +#[derive(Debug)] +pub struct RemoteHandle { + pub(crate) handle: PlatformHandleType, + #[cfg(windows)] + pub(crate) target: u32, + #[cfg(windows)] + pub(crate) target_handle: Option<PlatformHandleType>, +} + +impl RemoteHandle { + #[allow(clippy::missing_safety_doc)] + pub unsafe fn new(handle: PlatformHandleType, _target: u32) -> Self { + RemoteHandle { + handle, + #[cfg(windows)] + target: _target, + #[cfg(windows)] + target_handle: None, + } + } + + #[cfg(windows)] + pub fn mark_sent(&mut self) { + self.target_handle.take(); + } + + #[cfg(windows)] + #[allow(clippy::missing_safety_doc)] + pub unsafe fn send_to_target(&mut self) -> std::io::Result<PlatformHandleType> { + let target_handle = crate::duplicate_platform_handle(self.handle, Some(self.target))?; + self.target_handle = Some(target_handle); + Ok(target_handle) + } + + #[cfg(unix)] + #[allow(clippy::missing_safety_doc)] + pub unsafe fn take(self) -> PlatformHandleType { + let h = self.handle; + std::mem::forget(self); + h + } +} + +impl Drop for RemoteHandle { + fn drop(&mut self) { + unsafe { + crate::close_platform_handle(self.handle); + } + #[cfg(windows)] + unsafe { + if let Some(target_handle) = self.target_handle { + if let Err(e) = crate::close_target_handle(target_handle, self.target) { + trace!("RemoteHandle failed to close target handle: {:?}", e); + } + } + } + } +} + +unsafe impl Send for RemoteHandle {} + +pub trait AssociateHandleForMessage { + // True if this item has an associated handle attached for remoting. + fn has_associated_handle(&self) -> bool { + false + } + + // Take ownership of the associated handle, leaving the item's + // associated handle empty. + fn take_handle(&mut self) -> RemoteHandle { + panic!("take_handle called on item without associated handle"); + } + + #[allow(clippy::missing_safety_doc)] + // Replace an empty associated handle with a non-owning serializable value + // indicating the value of the handle in the remote process. + #[cfg(windows)] + unsafe fn set_remote_handle(&mut self, _: PlatformHandleType) { + panic!("set_remote_handle called on item without associated handle"); + } + + #[allow(clippy::missing_safety_doc)] + // Replace a serialized associated handle value with an owned local handle. + #[cfg(windows)] + unsafe fn set_local_handle(&mut self) { + panic!("set_local_handle called on item without associated handle"); + } + + #[allow(clippy::missing_safety_doc)] + // Replace an empty associated handle with an owned local handle. + #[cfg(unix)] + unsafe fn set_local_handle(&mut self, _: PlatformHandleType) { + panic!("set_local_handle called on item without associated handle"); + } +} + +impl AssociateHandleForMessage for ClientMessage { + fn has_associated_handle(&self) -> bool { + matches!( + *self, + ClientMessage::StreamCreated(_) + | ClientMessage::StreamInitialized(_) + | ClientMessage::ContextSetupDeviceCollectionCallback(_) + ) + } + + fn take_handle(&mut self) -> RemoteHandle { + match *self { + ClientMessage::StreamCreated(ref mut data) => data.shm_handle.take_handle_for_send(), + ClientMessage::StreamInitialized(ref mut data) => data.take_handle_for_send(), + ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { + data.platform_handle.take_handle_for_send() + } + _ => panic!("take_handle called on item without associated handle"), + } + } + + #[cfg(windows)] + unsafe fn set_remote_handle(&mut self, handle: PlatformHandleType) { + match *self { + ClientMessage::StreamCreated(ref mut data) => { + data.shm_handle = SerializableHandle::new_serializable_value(handle); + } + ClientMessage::StreamInitialized(ref mut data) => { + *data = SerializableHandle::new_serializable_value(handle); + } + ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { + data.platform_handle = SerializableHandle::new_serializable_value(handle); + } + _ => panic!("set_remote_handle called on item without associated handle"), + } + } + + #[cfg(windows)] + unsafe fn set_local_handle(&mut self) { + match *self { + ClientMessage::StreamCreated(ref mut data) => data.shm_handle.make_owned(), + ClientMessage::StreamInitialized(ref mut data) => data.make_owned(), + ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { + data.platform_handle.make_owned() + } + _ => panic!("set_local_handle called on item without associated handle"), + } + } + + #[cfg(unix)] + unsafe fn set_local_handle(&mut self, handle: PlatformHandleType) { + match *self { + ClientMessage::StreamCreated(ref mut data) => { + data.shm_handle = SerializableHandle::new_owned(handle); + } + ClientMessage::StreamInitialized(ref mut data) => { + *data = SerializableHandle::new_owned(handle); + } + ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { + data.platform_handle = SerializableHandle::new_owned(handle); + } + _ => panic!("set_local_handle called on item without associated handle"), + } + } +} + +impl AssociateHandleForMessage for ServerMessage {} + +impl AssociateHandleForMessage for DeviceCollectionReq {} +impl AssociateHandleForMessage for DeviceCollectionResp {} + +impl AssociateHandleForMessage for CallbackReq {} +impl AssociateHandleForMessage for CallbackResp {} + +#[cfg(test)] +mod test { + use super::StreamParams; + use cubeb::ffi; + use std::mem; + + #[test] + fn stream_params_size_check() { + assert_eq!( + mem::size_of::<StreamParams>(), + mem::size_of::<ffi::cubeb_stream_params>() + ) + } + + #[test] + fn stream_params_from() { + let raw = ffi::cubeb_stream_params { + format: ffi::CUBEB_SAMPLE_FLOAT32BE, + rate: 96_000, + channels: 32, + layout: ffi::CUBEB_LAYOUT_3F1_LFE, + prefs: ffi::CUBEB_STREAM_PREF_LOOPBACK, + }; + let wrapped = ::cubeb::StreamParams::from(raw); + let params = StreamParams::from(wrapped.as_ref()); + assert_eq!(params.format, raw.format); + assert_eq!(params.rate, raw.rate); + assert_eq!(params.channels, raw.channels); + assert_eq!(params.layout, raw.layout); + assert_eq!(params.prefs, raw.prefs); + } +} diff --git a/third_party/rust/audioipc2/src/rpccore.rs b/third_party/rust/audioipc2/src/rpccore.rs new file mode 100644 index 0000000000..69ed2e8cf0 --- /dev/null +++ b/third_party/rust/audioipc2/src/rpccore.rs @@ -0,0 +1,470 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crossbeam_queue::ArrayQueue; +use mio::Token; +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::io::{self, Error, ErrorKind, Result}; +use std::marker::PhantomPinned; +use std::mem::ManuallyDrop; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Weak}; + +use crate::ipccore::EventLoopHandle; + +// This provides a safe-ish method for a thread to allocate +// stack storage space for a result, then pass a (wrapped) +// pointer to that location to another thread via +// a CompletionWriter to eventually store a result into. +struct Completion<T> { + item: UnsafeCell<Option<T>>, + writer: AtomicBool, + _pin: PhantomPinned, // disable rustc's no-alias +} + +impl<T> Completion<T> { + fn new() -> Self { + Completion { + item: UnsafeCell::new(None), + writer: AtomicBool::new(false), + _pin: PhantomPinned, + } + } + + // Wait until the writer completes, then return the result. + // This is intended to be a single-use function, once the writer + // has completed any further attempts to wait will return None. + fn wait(&self) -> Option<T> { + // Wait for the writer to complete or be dropped. + while self.writer.load(Ordering::Acquire) { + std::thread::park(); + } + unsafe { (*self.item.get()).take() } + } + + // Create a writer for the other thread to store the + // expected result into. + fn writer(&self) -> CompletionWriter<T> { + assert!(!self.writer.load(Ordering::Relaxed)); + self.writer.store(true, Ordering::Release); + CompletionWriter { + ptr: self as *const _ as *mut _, + waiter: std::thread::current(), + } + } +} + +impl<T> Drop for Completion<T> { + fn drop(&mut self) { + // Wait for the outstanding writer to complete before + // dropping, since the CompletionWriter references + // memory owned by this object. + while self.writer.load(Ordering::Acquire) { + std::thread::park(); + } + } +} + +struct CompletionWriter<T> { + ptr: *mut Completion<T>, // Points to a Completion on another thread's stack + waiter: std::thread::Thread, // Identifies thread waiting for completion +} + +impl<T> CompletionWriter<T> { + fn set(self, value: T) { + // Store the result into the Completion's memory. + // Since `set` consumes `self`, rely on `Drop` to + // mark the writer as done and wake the Completion's + // thread. + unsafe { + assert!((*self.ptr).writer.load(Ordering::Relaxed)); + *(*self.ptr).item.get() = Some(value); + } + } +} + +impl<T> Drop for CompletionWriter<T> { + fn drop(&mut self) { + // Mark writer as complete - if `set` was not called, + // the waiter will receive `None`. + unsafe { + (*self.ptr).writer.store(false, Ordering::Release); + } + // Wake the Completion's thread. + self.waiter.unpark(); + } +} + +// Safety: CompletionWriter holds a pointer to a Completion +// residing on another thread's stack. The Completion always +// waits for an outstanding writer if present, and CompletionWriter +// releases the waiter and wakes the Completion's thread on drop, +// so this pointer will always be live for the duration of a +// CompletionWriter. +unsafe impl<T> Send for CompletionWriter<T> {} + +// RPC message handler. Implemented by ClientHandler (for Client) +// and ServerHandler (for Server). +pub(crate) trait Handler { + type In; + type Out; + + // Consume a request + fn consume(&mut self, request: Self::In) -> Result<()>; + + // Produce a response + fn produce(&mut self) -> Result<Option<Self::Out>>; +} + +// Client RPC definition. This supplies the expected message +// request and response types. +pub trait Client { + type ServerMessage; + type ClientMessage; +} + +// Server RPC definition. This supplies the expected message +// request and response types. `process` is passed inbound RPC +// requests by the ServerHandler to be responded to by the server. +pub trait Server { + type ServerMessage; + type ClientMessage; + + fn process(&mut self, req: Self::ServerMessage) -> Self::ClientMessage; +} + +// RPC Client Proxy implementation. +type ProxyRequest<Request, Response> = (Request, CompletionWriter<Response>); + +// RPC Proxy that may be `clone`d for use by multiple owners/threads. +// A Proxy `call` arranges for the supplied request to be transmitted +// to the associated Server via RPC and blocks awaiting the response +// via the associated `Completion`. +// A ClientHandler normally lives until the last Proxy is dropped, but if the ClientHandler +// encounters an internal error, `requests` will fail to upgrade, allowing +// the proxy to report an error. +#[derive(Debug)] +pub struct Proxy<Request, Response> { + handle: Option<(EventLoopHandle, Token)>, + requests: ManuallyDrop<RequestQueueSender<ProxyRequest<Request, Response>>>, +} + +impl<Request, Response> Proxy<Request, Response> { + fn new(requests: RequestQueueSender<ProxyRequest<Request, Response>>) -> Self { + Self { + handle: None, + requests: ManuallyDrop::new(requests), + } + } + + pub fn call(&self, request: Request) -> Result<Response> { + let response = Completion::new(); + self.requests.push((request, response.writer()))?; + self.wake_connection(); + match response.wait() { + Some(resp) => Ok(resp), + None => Err(Error::new(ErrorKind::Other, "proxy recv error")), + } + } + + pub(crate) fn connect_event_loop(&mut self, handle: EventLoopHandle, token: Token) { + self.handle = Some((handle, token)); + } + + fn wake_connection(&self) { + let (handle, token) = self + .handle + .as_ref() + .expect("proxy not connected to event loop"); + handle.wake_connection(*token); + } +} + +impl<Request, Response> Clone for Proxy<Request, Response> { + fn clone(&self) -> Self { + let mut clone = Self::new((*self.requests).clone()); + let (handle, token) = self + .handle + .as_ref() + .expect("proxy not connected to event loop"); + clone.connect_event_loop(handle.clone(), *token); + clone + } +} + +impl<Request, Response> Drop for Proxy<Request, Response> { + fn drop(&mut self) { + trace!("Proxy drop, waking EventLoop"); + // Must drop `requests` before waking the connection, otherwise + // the wake may be processed before the (last) weak reference is + // dropped. + let last_proxy = self.requests.live_proxies(); + unsafe { + ManuallyDrop::drop(&mut self.requests); + } + if last_proxy == 1 && self.handle.is_some() { + self.wake_connection() + } + } +} + +const RPC_CLIENT_INITIAL_PROXIES: usize = 32; // Initial proxy pre-allocation per client. + +// Client-specific Handler implementation. +// The IPC EventLoop Driver calls this to execute client-specific +// RPC handling. Serialized messages sent via a Proxy are queued +// for transmission when `produce` is called. +// Deserialized messages are passed via `consume` to +// trigger response completion by sending the response via a channel +// connected to a ProxyResponse. +pub(crate) struct ClientHandler<C: Client> { + in_flight: VecDeque<CompletionWriter<C::ClientMessage>>, + requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>, +} + +impl<C: Client> ClientHandler<C> { + fn new( + requests: Arc<RequestQueue<ProxyRequest<C::ServerMessage, C::ClientMessage>>>, + ) -> ClientHandler<C> { + ClientHandler::<C> { + in_flight: VecDeque::with_capacity(RPC_CLIENT_INITIAL_PROXIES), + requests, + } + } +} + +impl<C: Client> Handler for ClientHandler<C> { + type In = C::ClientMessage; + type Out = C::ServerMessage; + + fn consume(&mut self, response: Self::In) -> Result<()> { + trace!("ClientHandler::consume"); + if let Some(response_writer) = self.in_flight.pop_front() { + response_writer.set(response); + } else { + return Err(Error::new(ErrorKind::Other, "request/response mismatch")); + } + + Ok(()) + } + + fn produce(&mut self) -> Result<Option<Self::Out>> { + trace!("ClientHandler::produce"); + + // If the weak count is zero, no proxies are attached and + // no further proxies can be attached since every proxy + // after the initial one is cloned from an existing instance. + self.requests.check_live_proxies()?; + // Try to get a new message + match self.requests.pop() { + Some((request, response_writer)) => { + trace!(" --> received request"); + self.in_flight.push_back(response_writer); + Ok(Some(request)) + } + None => { + trace!(" --> no request"); + Ok(None) + } + } + } +} + +#[derive(Debug)] +pub(crate) struct RequestQueue<T> { + queue: ArrayQueue<T>, +} + +impl<T> RequestQueue<T> { + pub(crate) fn new(size: usize) -> Self { + RequestQueue { + queue: ArrayQueue::new(size), + } + } + + pub(crate) fn pop(&self) -> Option<T> { + self.queue.pop() + } + + pub(crate) fn new_sender(self: &Arc<Self>) -> RequestQueueSender<T> { + RequestQueueSender { + inner: Arc::downgrade(self), + } + } + + pub(crate) fn check_live_proxies(self: &Arc<Self>) -> Result<()> { + if Arc::weak_count(self) == 0 { + return Err(io::ErrorKind::ConnectionAborted.into()); + } + Ok(()) + } +} + +pub(crate) struct RequestQueueSender<T> { + inner: Weak<RequestQueue<T>>, +} + +impl<T> RequestQueueSender<T> { + pub(crate) fn push(&self, request: T) -> Result<()> { + if let Some(consumer) = self.inner.upgrade() { + if consumer.queue.push(request).is_err() { + debug!("Proxy[{:p}]: call failed - CH::requests full", self); + return Err(io::ErrorKind::ConnectionAborted.into()); + } + return Ok(()); + } + debug!("Proxy[{:p}]: call failed - CH::requests dropped", self); + Err(Error::new(ErrorKind::Other, "proxy send error")) + } + + pub(crate) fn live_proxies(&self) -> usize { + Weak::weak_count(&self.inner) + } +} + +impl<T> Clone for RequestQueueSender<T> { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl<T> std::fmt::Debug for RequestQueueSender<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RequestQueueProducer") + .field("inner", &self.inner.as_ptr()) + .finish() + } +} + +#[allow(clippy::type_complexity)] +pub(crate) fn make_client<C: Client>( +) -> Result<(ClientHandler<C>, Proxy<C::ServerMessage, C::ClientMessage>)> { + let requests = Arc::new(RequestQueue::new(RPC_CLIENT_INITIAL_PROXIES)); + let proxy_req = requests.new_sender(); + let handler = ClientHandler::new(requests); + + Ok((handler, Proxy::new(proxy_req))) +} + +// Server-specific Handler implementation. +// The IPC EventLoop Driver calls this to execute server-specific +// RPC handling. Deserialized messages are passed via `consume` to the +// associated `server` for processing. Server responses are then queued +// for RPC to the associated client when `produce` is called. +pub(crate) struct ServerHandler<S: Server> { + server: S, + in_flight: VecDeque<S::ClientMessage>, +} + +impl<S: Server> Handler for ServerHandler<S> { + type In = S::ServerMessage; + type Out = S::ClientMessage; + + fn consume(&mut self, message: Self::In) -> Result<()> { + trace!("ServerHandler::consume"); + let response = self.server.process(message); + self.in_flight.push_back(response); + Ok(()) + } + + fn produce(&mut self) -> Result<Option<Self::Out>> { + trace!("ServerHandler::produce"); + + // Return the ready response + match self.in_flight.pop_front() { + Some(res) => { + trace!(" --> received response"); + Ok(Some(res)) + } + None => { + trace!(" --> no response ready"); + Ok(None) + } + } + } +} + +const RPC_SERVER_INITIAL_CLIENTS: usize = 32; // Initial client allocation per server. + +pub(crate) fn make_server<S: Server>(server: S) -> ServerHandler<S> { + ServerHandler::<S> { + server, + in_flight: VecDeque::with_capacity(RPC_SERVER_INITIAL_CLIENTS), + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn basic() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + assert!(queue.pop().is_none()); + producer.push(1).unwrap(); + assert!(queue.pop().is_some()); + assert!(queue.pop().is_none()); + } + + #[test] + fn queue_dropped() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + drop(queue); + assert!(producer.push(1).is_err()); + } + + #[test] + fn queue_full() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + producer.push(1).unwrap(); + assert!(producer.push(2).is_err()); + } + + #[test] + fn queue_producer_clone() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + let producer2 = producer.clone(); + producer.push(1).unwrap(); + assert!(producer2.push(2).is_err()); + } + + #[test] + fn queue_producer_drop() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + let producer2 = producer.clone(); + drop(producer); + assert!(producer2.push(2).is_ok()); + } + + #[test] + fn queue_producer_weak() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + let producer2 = producer.clone(); + drop(queue); + assert!(producer2.push(2).is_err()); + } + + #[test] + fn queue_producer_shutdown() { + let queue = Arc::new(RequestQueue::new(1)); + let producer = queue.new_sender(); + let producer2 = producer.clone(); + producer.push(1).unwrap(); + assert!(Arc::weak_count(&queue) == 2); + drop(producer); + assert!(Arc::weak_count(&queue) == 1); + drop(producer2); + assert!(Arc::weak_count(&queue) == 0); + } +} diff --git a/third_party/rust/audioipc2/src/shm.rs b/third_party/rust/audioipc2/src/shm.rs new file mode 100644 index 0000000000..2aaa92ef36 --- /dev/null +++ b/third_party/rust/audioipc2/src/shm.rs @@ -0,0 +1,334 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +#![allow(clippy::missing_safety_doc)] + +use crate::errors::*; +use crate::PlatformHandle; +use std::{convert::TryInto, ffi::c_void, slice}; + +#[cfg(unix)] +pub use unix::SharedMem; +#[cfg(windows)] +pub use windows::SharedMem; + +#[derive(Copy, Clone)] +struct SharedMemView { + ptr: *mut c_void, + size: usize, +} + +unsafe impl Send for SharedMemView {} + +impl SharedMemView { + pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> { + let map = slice::from_raw_parts(self.ptr as _, self.size); + if size <= self.size { + Ok(&map[..size]) + } else { + bail!("mmap size"); + } + } + + pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { + let map = slice::from_raw_parts_mut(self.ptr as _, self.size); + if size <= self.size { + Ok(&mut map[..size]) + } else { + bail!("mmap size") + } + } +} + +#[cfg(unix)] +mod unix { + use super::*; + use memmap2::{MmapMut, MmapOptions}; + use std::fs::File; + use std::os::unix::io::{AsRawFd, FromRawFd}; + + #[cfg(target_os = "android")] + fn open_shm_file(_id: &str, size: usize) -> Result<File> { + unsafe { + let fd = ashmem::ASharedMemory_create(std::ptr::null(), size); + if fd >= 0 { + // Drop PROT_EXEC + let r = ashmem::ASharedMemory_setProt(fd, libc::PROT_READ | libc::PROT_WRITE); + assert_eq!(r, 0); + return Ok(File::from_raw_fd(fd.try_into().unwrap())); + } + Err(std::io::Error::last_os_error().into()) + } + } + + #[cfg(not(target_os = "android"))] + fn open_shm_file(id: &str, size: usize) -> Result<File> { + let file = open_shm_file_impl(id)?; + allocate_file(&file, size)?; + Ok(file) + } + + #[cfg(not(target_os = "android"))] + fn open_shm_file_impl(id: &str) -> Result<File> { + use std::env::temp_dir; + use std::fs::{remove_file, OpenOptions}; + + let id_cstring = std::ffi::CString::new(id).unwrap(); + + #[cfg(target_os = "linux")] + { + unsafe { + let r = libc::syscall(libc::SYS_memfd_create, id_cstring.as_ptr(), 0); + if r >= 0 { + return Ok(File::from_raw_fd(r.try_into().unwrap())); + } + } + + let mut path = std::path::PathBuf::from("/dev/shm"); + path.push(id); + + if let Ok(file) = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + { + let _ = remove_file(&path); + return Ok(file); + } + } + + unsafe { + let fd = libc::shm_open( + id_cstring.as_ptr(), + libc::O_RDWR | libc::O_CREAT | libc::O_EXCL, + 0o600, + ); + if fd >= 0 { + libc::shm_unlink(id_cstring.as_ptr()); + return Ok(File::from_raw_fd(fd)); + } + } + + let mut path = temp_dir(); + path.push(id); + + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path)?; + + let _ = remove_file(&path); + Ok(file) + } + + #[cfg(not(target_os = "android"))] + fn handle_enospc(s: &str) -> Result<()> { + let err = std::io::Error::last_os_error(); + let errno = err.raw_os_error().unwrap_or(0); + assert_ne!(errno, 0); + debug!("allocate_file: {} failed errno={}", s, errno); + if errno == libc::ENOSPC { + return Err(err.into()); + } + Ok(()) + } + + #[cfg(not(target_os = "android"))] + fn allocate_file(file: &File, size: usize) -> Result<()> { + // First, set the file size. This may create a sparse file on + // many systems, which can fail with SIGBUS when accessed via a + // mapping and the lazy backing allocation fails due to low disk + // space. To avoid this, try to force the entire file to be + // preallocated before mapping using OS-specific approaches below. + + file.set_len(size.try_into().unwrap())?; + + let fd = file.as_raw_fd(); + let size: libc::off_t = size.try_into().unwrap(); + + // Try Linux-specific fallocate. + #[cfg(target_os = "linux")] + { + if unsafe { libc::fallocate(fd, 0, 0, size) } == 0 { + return Ok(()); + } + handle_enospc("fallocate()")?; + } + + // Try macOS-specific fcntl. + #[cfg(target_os = "macos")] + { + let params = libc::fstore_t { + fst_flags: libc::F_ALLOCATEALL, + fst_posmode: libc::F_PEOFPOSMODE, + fst_offset: 0, + fst_length: size, + fst_bytesalloc: 0, + }; + if unsafe { libc::fcntl(fd, libc::F_PREALLOCATE, ¶ms) } == 0 { + return Ok(()); + } + handle_enospc("fcntl(F_PREALLOCATE)")?; + } + + // Fall back to portable version, where available. + #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "dragonfly"))] + { + if unsafe { libc::posix_fallocate(fd, 0, size) } == 0 { + return Ok(()); + } + handle_enospc("posix_fallocate()")?; + } + + Ok(()) + } + + pub struct SharedMem { + file: File, + _mmap: MmapMut, + view: SharedMemView, + } + + impl SharedMem { + pub fn new(id: &str, size: usize) -> Result<SharedMem> { + let file = open_shm_file(id, size)?; + let mut mmap = unsafe { MmapOptions::new().len(size).map_mut(&file)? }; + assert_eq!(mmap.len(), size); + let view = SharedMemView { + ptr: mmap.as_mut_ptr() as _, + size, + }; + Ok(SharedMem { + file, + _mmap: mmap, + view, + }) + } + + pub unsafe fn make_handle(&self) -> Result<PlatformHandle> { + PlatformHandle::duplicate(self.file.as_raw_fd()).map_err(|e| e.into()) + } + + pub unsafe fn from(handle: PlatformHandle, size: usize) -> Result<SharedMem> { + let file = File::from_raw_fd(handle.into_raw()); + let mut mmap = MmapOptions::new().len(size).map_mut(&file)?; + assert_eq!(mmap.len(), size); + let view = SharedMemView { + ptr: mmap.as_mut_ptr() as _, + size, + }; + Ok(SharedMem { + file, + _mmap: mmap, + view, + }) + } + + pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> { + self.view.get_slice(size) + } + + pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { + self.view.get_mut_slice(size) + } + + pub fn get_size(&self) -> usize { + self.view.size + } + } +} + +#[cfg(windows)] +mod windows { + use super::*; + use std::ptr; + use winapi::{ + shared::{minwindef::DWORD, ntdef::HANDLE}, + um::{ + handleapi::CloseHandle, + memoryapi::{MapViewOfFile, UnmapViewOfFile, FILE_MAP_ALL_ACCESS}, + winbase::CreateFileMappingA, + winnt::PAGE_READWRITE, + }, + }; + + use crate::INVALID_HANDLE_VALUE; + + pub struct SharedMem { + handle: HANDLE, + view: SharedMemView, + } + + unsafe impl Send for SharedMem {} + + impl Drop for SharedMem { + fn drop(&mut self) { + unsafe { + let ok = UnmapViewOfFile(self.view.ptr); + assert_ne!(ok, 0); + let ok = CloseHandle(self.handle); + assert_ne!(ok, 0); + } + } + } + + impl SharedMem { + pub fn new(_id: &str, size: usize) -> Result<SharedMem> { + unsafe { + let handle = CreateFileMappingA( + INVALID_HANDLE_VALUE, + ptr::null_mut(), + PAGE_READWRITE, + (size as u64 >> 32).try_into().unwrap(), + (size as u64 & (DWORD::MAX as u64)).try_into().unwrap(), + ptr::null(), + ); + if handle.is_null() { + return Err(std::io::Error::last_os_error().into()); + } + + let ptr = MapViewOfFile(handle, FILE_MAP_ALL_ACCESS, 0, 0, size); + if ptr.is_null() { + return Err(std::io::Error::last_os_error().into()); + } + + Ok(SharedMem { + handle, + view: SharedMemView { ptr, size }, + }) + } + } + + pub unsafe fn make_handle(&self) -> Result<PlatformHandle> { + PlatformHandle::duplicate(self.handle).map_err(|e| e.into()) + } + + pub unsafe fn from(handle: PlatformHandle, size: usize) -> Result<SharedMem> { + let handle = handle.into_raw(); + let ptr = MapViewOfFile(handle, FILE_MAP_ALL_ACCESS, 0, 0, size); + if ptr.is_null() { + return Err(std::io::Error::last_os_error().into()); + } + Ok(SharedMem { + handle, + view: SharedMemView { ptr, size }, + }) + } + + pub unsafe fn get_slice(&self, size: usize) -> Result<&[u8]> { + self.view.get_slice(size) + } + + pub unsafe fn get_mut_slice(&mut self, size: usize) -> Result<&mut [u8]> { + self.view.get_mut_slice(size) + } + + pub fn get_size(&self) -> usize { + self.view.size + } + } +} diff --git a/third_party/rust/audioipc2/src/sys/mod.rs b/third_party/rust/audioipc2/src/sys/mod.rs new file mode 100644 index 0000000000..0bcfdaa15e --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/mod.rs @@ -0,0 +1,77 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::{collections::VecDeque, io::Result}; + +use bytes::BytesMut; +use mio::{event::Source, Interest, Registry, Token}; + +#[cfg(unix)] +mod unix; +use crate::messages::RemoteHandle; + +#[cfg(unix)] +pub use self::unix::*; + +#[cfg(windows)] +mod windows; +#[cfg(windows)] +pub use self::windows::*; + +impl Source for Pipe { + fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> { + self.io.register(registry, token, interests) + } + + fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> { + self.io.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> Result<()> { + self.io.deregister(registry) + } +} + +const HANDLE_QUEUE_LIMIT: usize = 16; + +#[derive(Debug)] +pub struct ConnectionBuffer { + pub buf: BytesMut, + handles: VecDeque<RemoteHandle>, +} + +impl ConnectionBuffer { + pub fn with_capacity(cap: usize) -> Self { + ConnectionBuffer { + buf: BytesMut::with_capacity(cap), + handles: VecDeque::with_capacity(HANDLE_QUEUE_LIMIT), + } + } + + pub fn is_empty(&self) -> bool { + self.buf.is_empty() + } + + pub fn push_handle(&mut self, handle: RemoteHandle) { + assert!(self.handles.len() < self.handles.capacity()); + self.handles.push_back(handle) + } + + pub fn pop_handle(&mut self) -> Option<RemoteHandle> { + self.handles.pop_front() + } +} + +pub trait RecvMsg { + // Receive data from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members have been adjusted appropriately by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>; +} + +pub trait SendMsg { + // Send data on the associated connection. `send_msg` consumes and adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>; +} diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs new file mode 100644 index 0000000000..581df9a847 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs @@ -0,0 +1,104 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::sys::HANDLE_QUEUE_LIMIT; +use bytes::{BufMut, BytesMut}; +use libc::{self, cmsghdr}; +use std::convert::TryInto; +use std::os::unix::io::RawFd; +use std::{mem, slice}; + +trait AsBytes { + fn as_bytes(&self) -> &[u8]; +} + +impl<'a, T: Sized> AsBytes for &'a [T] { + fn as_bytes(&self) -> &[u8] { + // TODO: This should account for the alignment of T + let byte_count = self.len() * mem::size_of::<T>(); + unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) } + } +} + +// Encode `handles` into a cmsghdr in `buf`. +pub fn encode_handles(cmsg: &mut BytesMut, handles: &[RawFd]) { + assert!(handles.len() <= HANDLE_QUEUE_LIMIT); + let msg = handles.as_bytes(); + + let cmsg_space = space(msg.len()); + assert!(cmsg.remaining_mut() >= cmsg_space); + + // Some definitions of cmsghdr contain padding. Rather + // than try to keep an up-to-date #cfg list to handle + // that, just use a pre-zeroed struct to fill out any + // fields we don't care about. + let zeroed = unsafe { mem::zeroed() }; + #[allow(clippy::needless_update)] + // `cmsg_len` is `usize` on some platforms, `u32` on others. + #[allow(clippy::useless_conversion)] + let cmsghdr = cmsghdr { + cmsg_len: len(msg.len()).try_into().unwrap(), + cmsg_level: libc::SOL_SOCKET, + cmsg_type: libc::SCM_RIGHTS, + ..zeroed + }; + + unsafe { + let cmsghdr_ptr = cmsg.chunk_mut().as_mut_ptr(); + std::ptr::copy_nonoverlapping( + &cmsghdr as *const _ as *const _, + cmsghdr_ptr, + mem::size_of::<cmsghdr>(), + ); + let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _); + std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len()); + cmsg.advance_mut(cmsg_space); + } +} + +// Decode `buf` containing a cmsghdr with one or more handle(s). +pub fn decode_handles(buf: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, HANDLE_QUEUE_LIMIT> { + let mut fds = arrayvec::ArrayVec::<RawFd, HANDLE_QUEUE_LIMIT>::new(); + + let cmsghdr_len = len(0); + + if buf.len() < cmsghdr_len { + // No more entries---not enough data in `buf` for a + // complete message. + return fds; + } + + let cmsg: &cmsghdr = unsafe { &*(buf.as_ptr() as *const _) }; + #[allow(clippy::unnecessary_cast)] // `cmsg_len` type is platform-dependent. + let cmsg_len = cmsg.cmsg_len as usize; + + match (cmsg.cmsg_level, cmsg.cmsg_type) { + (libc::SOL_SOCKET, libc::SCM_RIGHTS) => { + trace!("Found SCM_RIGHTS..."); + let slice = &buf[cmsghdr_len..cmsg_len]; + let slice = unsafe { + slice::from_raw_parts( + slice.as_ptr() as *const _, + slice.len() / mem::size_of::<i32>(), + ) + }; + fds.try_extend_from_slice(slice).unwrap(); + } + (level, kind) => { + trace!("Skipping cmsg level, {}, type={}...", level, kind); + } + } + + assert!(fds.len() <= HANDLE_QUEUE_LIMIT); + fds +} + +fn len(len: usize) -> usize { + unsafe { libc::CMSG_LEN(len.try_into().unwrap()) as usize } +} + +pub fn space(len: usize) -> usize { + unsafe { libc::CMSG_SPACE(len.try_into().unwrap()) as usize } +} diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c new file mode 100644 index 0000000000..82d7852867 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c @@ -0,0 +1,23 @@ +#include <sys/socket.h> +#include <inttypes.h> +#include <string.h> + +const uint8_t* +cmsghdr_bytes(size_t* size) +{ + int myfd = 0; + + static union { + uint8_t buf[CMSG_SPACE(sizeof(myfd))]; + struct cmsghdr align; + } u; + + u.align.cmsg_len = CMSG_LEN(sizeof(myfd)); + u.align.cmsg_level = SOL_SOCKET; + u.align.cmsg_type = SCM_RIGHTS; + + memcpy(CMSG_DATA(&u.align), &myfd, sizeof(myfd)); + + *size = sizeof(u); + return (const uint8_t*)&u.buf; +} diff --git a/third_party/rust/audioipc2/src/sys/unix/mod.rs b/third_party/rust/audioipc2/src/sys/unix/mod.rs new file mode 100644 index 0000000000..84f3f1edf2 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/mod.rs @@ -0,0 +1,126 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::io::Result; +use std::os::unix::prelude::{AsRawFd, FromRawFd}; + +use bytes::{Buf, BufMut, BytesMut}; +use iovec::IoVec; +use mio::net::UnixStream; + +use crate::PlatformHandle; + +use super::{ConnectionBuffer, RecvMsg, SendMsg, HANDLE_QUEUE_LIMIT}; + +pub mod cmsg; +mod msg; + +pub struct Pipe { + pub(crate) io: UnixStream, + cmsg: BytesMut, +} + +impl Pipe { + fn new(io: UnixStream) -> Self { + Pipe { + io, + cmsg: BytesMut::with_capacity(cmsg::space( + std::mem::size_of::<i32>() * HANDLE_QUEUE_LIMIT, + )), + } + } +} + +// Create a connected "pipe" pair. The `Pipe` is the server end, +// the `PlatformHandle` is the client end to be remoted. +pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> { + let (server, client) = UnixStream::pair()?; + Ok((Pipe::new(server), PlatformHandle::from(client))) +} + +impl Pipe { + #[allow(clippy::missing_safety_doc)] + pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe { + Pipe::new(UnixStream::from_raw_fd(handle.into_raw())) + } + + pub fn shutdown(&mut self) -> Result<()> { + self.io.shutdown(std::net::Shutdown::Both) + } +} + +impl RecvMsg for Pipe { + // Receive data (and fds) from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members has been adjusted appropriate by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(buf.buf.remaining_mut() > 0); + // TODO: MSG_CMSG_CLOEXEC not portable. + // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead. + #[cfg(target_os = "linux")] + let flags = libc::MSG_CMSG_CLOEXEC | libc::MSG_NOSIGNAL; + #[cfg(not(target_os = "linux"))] + let flags = 0; + let r = unsafe { + let chunk = buf.buf.chunk_mut(); + let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len()); + let mut iovec = [<&mut IoVec>::from(slice)]; + msg::recv_msg_with_flags( + self.io.as_raw_fd(), + &mut iovec, + self.cmsg.chunk_mut(), + flags, + ) + }; + match r { + Ok((n, cmsg_n, msg_flags)) => unsafe { + trace!("recv_msg_with_flags flags={}", msg_flags); + buf.buf.advance_mut(n); + self.cmsg.advance_mut(cmsg_n); + let handles = cmsg::decode_handles(&mut self.cmsg); + self.cmsg.clear(); + let unused = 0; + for h in handles { + buf.push_handle(super::RemoteHandle::new(h, unused)); + } + Ok(n) + }, + Err(e) => Err(e), + } + } +} + +impl SendMsg for Pipe { + // Send data (and fds) on the associated connection. `send_msg` adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(!buf.buf.is_empty()); + if !buf.handles.is_empty() { + let mut handles = [-1i32; HANDLE_QUEUE_LIMIT]; + for (i, h) in buf.handles.iter().enumerate() { + handles[i] = h.handle; + } + cmsg::encode_handles(&mut self.cmsg, &handles[..buf.handles.len()]); + } + let r = { + // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead. + #[cfg(target_os = "linux")] + let flags = libc::MSG_NOSIGNAL; + #[cfg(not(target_os = "linux"))] + let flags = 0; + let iovec = [<&IoVec>::from(&buf.buf[..buf.buf.len()])]; + msg::send_msg_with_flags(self.io.as_raw_fd(), &iovec, &self.cmsg, flags) + }; + match r { + Ok(n) => { + buf.buf.advance(n); + // Discard sent handles. + while buf.handles.pop_front().is_some() {} + self.cmsg.clear(); + Ok(n) + } + Err(e) => Err(e), + } + } +} diff --git a/third_party/rust/audioipc2/src/sys/unix/msg.rs b/third_party/rust/audioipc2/src/sys/unix/msg.rs new file mode 100644 index 0000000000..c2cd353289 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/msg.rs @@ -0,0 +1,82 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use bytes::buf::UninitSlice; +use iovec::unix; +use iovec::IoVec; +use std::os::unix::io::RawFd; +use std::{cmp, io, mem, ptr}; + +fn cvt(r: libc::ssize_t) -> io::Result<usize> { + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(r as usize) + } +} + +// Convert return of -1 into error message, handling retry on EINTR +fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> { + loop { + match cvt(f()) { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + other => return other, + } + } +} + +pub(crate) fn recv_msg_with_flags( + socket: RawFd, + bufs: &mut [&mut IoVec], + cmsg: &mut UninitSlice, + flags: libc::c_int, +) -> io::Result<(usize, usize, libc::c_int)> { + let slice = unix::as_os_slice_mut(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let (control, controllen) = if cmsg.len() == 0 { + (ptr::null_mut(), 0) + } else { + (cmsg.as_mut_ptr() as _, cmsg.len()) + }; + + let mut msghdr: libc::msghdr = unsafe { mem::zeroed() }; + msghdr.msg_name = ptr::null_mut(); + msghdr.msg_namelen = 0; + msghdr.msg_iov = slice.as_mut_ptr(); + msghdr.msg_iovlen = len as _; + msghdr.msg_control = control; + msghdr.msg_controllen = controllen as _; + + let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?; + + #[allow(clippy::unnecessary_cast)] // `msg_controllen` type is platform-dependent. + let controllen = msghdr.msg_controllen as usize; + Ok((n, controllen, msghdr.msg_flags)) +} + +pub(crate) fn send_msg_with_flags( + socket: RawFd, + bufs: &[&IoVec], + cmsg: &[u8], + flags: libc::c_int, +) -> io::Result<usize> { + let slice = unix::as_os_slice(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let (control, controllen) = if cmsg.is_empty() { + (ptr::null_mut(), 0) + } else { + (cmsg.as_ptr() as *mut _, cmsg.len()) + }; + + let mut msghdr: libc::msghdr = unsafe { mem::zeroed() }; + msghdr.msg_name = ptr::null_mut(); + msghdr.msg_namelen = 0; + msghdr.msg_iov = slice.as_ptr() as *mut _; + msghdr.msg_iovlen = len as _; + msghdr.msg_control = control; + msghdr.msg_controllen = controllen as _; + + cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) }) +} diff --git a/third_party/rust/audioipc2/src/sys/windows/mod.rs b/third_party/rust/audioipc2/src/sys/windows/mod.rs new file mode 100644 index 0000000000..973e9608d1 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/windows/mod.rs @@ -0,0 +1,102 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::{ + fs::OpenOptions, + io::{Read, Write}, + os::windows::prelude::{FromRawHandle, OpenOptionsExt}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use std::io::Result; + +use bytes::{Buf, BufMut}; +use mio::windows::NamedPipe; +use winapi::um::winbase::FILE_FLAG_OVERLAPPED; + +use crate::PlatformHandle; + +use super::{ConnectionBuffer, RecvMsg, SendMsg}; + +pub struct Pipe { + pub(crate) io: NamedPipe, +} + +// Create a connected "pipe" pair. The `Pipe` is the server end, +// the `PlatformHandle` is the client end to be remoted. +pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> { + let pipe_name = get_pipe_name(); + let server = NamedPipe::new(&pipe_name)?; + + let client = { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = opts.open(&pipe_name)?; + PlatformHandle::from(file) + }; + + Ok((Pipe::new(server), client)) +} + +static PIPE_ID: AtomicUsize = AtomicUsize::new(0); + +fn get_pipe_name() -> String { + let pid = std::process::id(); + let pipe_id = PIPE_ID.fetch_add(1, Ordering::Relaxed); + format!("\\\\.\\pipe\\LOCAL\\cubeb-pipe-{pid}-{pipe_id}") +} + +impl Pipe { + pub fn new(io: NamedPipe) -> Self { + Self { io } + } + + #[allow(clippy::missing_safety_doc)] + pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe { + Pipe::new(NamedPipe::from_raw_handle(handle.into_raw())) + } + + pub fn shutdown(&mut self) -> Result<()> { + self.io.disconnect() + } +} + +impl RecvMsg for Pipe { + // Receive data from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members has been adjusted appropriate by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(buf.buf.remaining_mut() > 0); + let r = unsafe { + let chunk = buf.buf.chunk_mut(); + let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len()); + self.io.read(slice) + }; + match r { + Ok(n) => unsafe { + buf.buf.advance_mut(n); + Ok(n) + }, + e => e, + } + } +} + +impl SendMsg for Pipe { + // Send data on the associated connection. `send_msg` adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(!buf.buf.is_empty()); + let r = self.io.write(&buf.buf[..buf.buf.len()]); + if let Ok(n) = r { + buf.buf.advance(n); + while let Some(mut handle) = buf.pop_handle() { + handle.mark_sent() + } + } + r + } +} |