summaryrefslogtreecommitdiffstats
path: root/third_party/rust/audioipc2/src/sys
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/audioipc2/src/sys')
-rw-r--r--third_party/rust/audioipc2/src/sys/mod.rs77
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/cmsg.rs103
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/cmsghdr.c23
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/mod.rs126
-rw-r--r--third_party/rust/audioipc2/src/sys/unix/msg.rs81
-rw-r--r--third_party/rust/audioipc2/src/sys/windows/mod.rs102
6 files changed, 512 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2/src/sys/mod.rs b/third_party/rust/audioipc2/src/sys/mod.rs
new file mode 100644
index 0000000000..0bcfdaa15e
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/mod.rs
@@ -0,0 +1,77 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::{collections::VecDeque, io::Result};
+
+use bytes::BytesMut;
+use mio::{event::Source, Interest, Registry, Token};
+
+#[cfg(unix)]
+mod unix;
+use crate::messages::RemoteHandle;
+
+#[cfg(unix)]
+pub use self::unix::*;
+
+#[cfg(windows)]
+mod windows;
+#[cfg(windows)]
+pub use self::windows::*;
+
+impl Source for Pipe {
+ fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+ self.io.register(registry, token, interests)
+ }
+
+ fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> {
+ self.io.reregister(registry, token, interests)
+ }
+
+ fn deregister(&mut self, registry: &Registry) -> Result<()> {
+ self.io.deregister(registry)
+ }
+}
+
+const HANDLE_QUEUE_LIMIT: usize = 16;
+
+#[derive(Debug)]
+pub struct ConnectionBuffer {
+ pub buf: BytesMut,
+ handles: VecDeque<RemoteHandle>,
+}
+
+impl ConnectionBuffer {
+ pub fn with_capacity(cap: usize) -> Self {
+ ConnectionBuffer {
+ buf: BytesMut::with_capacity(cap),
+ handles: VecDeque::with_capacity(HANDLE_QUEUE_LIMIT),
+ }
+ }
+
+ pub fn is_empty(&self) -> bool {
+ self.buf.is_empty()
+ }
+
+ pub fn push_handle(&mut self, handle: RemoteHandle) {
+ assert!(self.handles.len() < self.handles.capacity());
+ self.handles.push_back(handle)
+ }
+
+ pub fn pop_handle(&mut self) -> Option<RemoteHandle> {
+ self.handles.pop_front()
+ }
+}
+
+pub trait RecvMsg {
+ // Receive data from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members have been adjusted appropriately by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>;
+}
+
+pub trait SendMsg {
+ // Send data on the associated connection. `send_msg` consumes and adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>;
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs
new file mode 100644
index 0000000000..0451f11926
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs
@@ -0,0 +1,103 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use crate::sys::HANDLE_QUEUE_LIMIT;
+use bytes::{BufMut, BytesMut};
+use libc::{self, cmsghdr};
+use std::convert::TryInto;
+use std::os::unix::io::RawFd;
+use std::{mem, slice};
+
+trait AsBytes {
+ fn as_bytes(&self) -> &[u8];
+}
+
+impl<'a, T: Sized> AsBytes for &'a [T] {
+ fn as_bytes(&self) -> &[u8] {
+ // TODO: This should account for the alignment of T
+ let byte_count = self.len() * mem::size_of::<T>();
+ unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) }
+ }
+}
+
+// Encode `handles` into a cmsghdr in `buf`.
+pub fn encode_handles(cmsg: &mut BytesMut, handles: &[RawFd]) {
+ assert!(handles.len() <= HANDLE_QUEUE_LIMIT);
+ let msg = handles.as_bytes();
+
+ let cmsg_space = space(msg.len());
+ assert!(cmsg.remaining_mut() >= cmsg_space);
+
+ // Some definitions of cmsghdr contain padding. Rather
+ // than try to keep an up-to-date #cfg list to handle
+ // that, just use a pre-zeroed struct to fill out any
+ // fields we don't care about.
+ let zeroed = unsafe { mem::zeroed() };
+ #[allow(clippy::needless_update)]
+ // `cmsg_len` is `usize` on some platforms, `u32` on others.
+ #[allow(clippy::useless_conversion)]
+ let cmsghdr = cmsghdr {
+ cmsg_len: len(msg.len()).try_into().unwrap(),
+ cmsg_level: libc::SOL_SOCKET,
+ cmsg_type: libc::SCM_RIGHTS,
+ ..zeroed
+ };
+
+ unsafe {
+ let cmsghdr_ptr = cmsg.chunk_mut().as_mut_ptr();
+ std::ptr::copy_nonoverlapping(
+ &cmsghdr as *const _ as *const _,
+ cmsghdr_ptr,
+ mem::size_of::<cmsghdr>(),
+ );
+ let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _);
+ std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len());
+ cmsg.advance_mut(cmsg_space);
+ }
+}
+
+// Decode `buf` containing a cmsghdr with one or more handle(s).
+pub fn decode_handles(buf: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, HANDLE_QUEUE_LIMIT> {
+ let mut fds = arrayvec::ArrayVec::<RawFd, HANDLE_QUEUE_LIMIT>::new();
+
+ let cmsghdr_len = len(0);
+
+ if buf.len() < cmsghdr_len {
+ // No more entries---not enough data in `buf` for a
+ // complete message.
+ return fds;
+ }
+
+ let cmsg: &cmsghdr = unsafe { &*(buf.as_ptr() as *const _) };
+ let cmsg_len = cmsg.cmsg_len as usize;
+
+ match (cmsg.cmsg_level, cmsg.cmsg_type) {
+ (libc::SOL_SOCKET, libc::SCM_RIGHTS) => {
+ trace!("Found SCM_RIGHTS...");
+ let slice = &buf[cmsghdr_len..cmsg_len];
+ let slice = unsafe {
+ slice::from_raw_parts(
+ slice.as_ptr() as *const _,
+ slice.len() / mem::size_of::<i32>(),
+ )
+ };
+ fds.try_extend_from_slice(slice).unwrap();
+ }
+ (level, kind) => {
+ trace!("Skipping cmsg level, {}, type={}...", level, kind);
+ }
+ }
+
+ assert!(fds.len() <= HANDLE_QUEUE_LIMIT);
+ fds
+}
+
+fn len(len: usize) -> usize {
+ unsafe { libc::CMSG_LEN(len.try_into().unwrap()) as usize }
+}
+
+pub fn space(len: usize) -> usize {
+ unsafe { libc::CMSG_SPACE(len.try_into().unwrap()) as usize }
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c
new file mode 100644
index 0000000000..82d7852867
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c
@@ -0,0 +1,23 @@
+#include <sys/socket.h>
+#include <inttypes.h>
+#include <string.h>
+
+const uint8_t*
+cmsghdr_bytes(size_t* size)
+{
+ int myfd = 0;
+
+ static union {
+ uint8_t buf[CMSG_SPACE(sizeof(myfd))];
+ struct cmsghdr align;
+ } u;
+
+ u.align.cmsg_len = CMSG_LEN(sizeof(myfd));
+ u.align.cmsg_level = SOL_SOCKET;
+ u.align.cmsg_type = SCM_RIGHTS;
+
+ memcpy(CMSG_DATA(&u.align), &myfd, sizeof(myfd));
+
+ *size = sizeof(u);
+ return (const uint8_t*)&u.buf;
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/mod.rs b/third_party/rust/audioipc2/src/sys/unix/mod.rs
new file mode 100644
index 0000000000..84f3f1edf2
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/mod.rs
@@ -0,0 +1,126 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::io::Result;
+use std::os::unix::prelude::{AsRawFd, FromRawFd};
+
+use bytes::{Buf, BufMut, BytesMut};
+use iovec::IoVec;
+use mio::net::UnixStream;
+
+use crate::PlatformHandle;
+
+use super::{ConnectionBuffer, RecvMsg, SendMsg, HANDLE_QUEUE_LIMIT};
+
+pub mod cmsg;
+mod msg;
+
+pub struct Pipe {
+ pub(crate) io: UnixStream,
+ cmsg: BytesMut,
+}
+
+impl Pipe {
+ fn new(io: UnixStream) -> Self {
+ Pipe {
+ io,
+ cmsg: BytesMut::with_capacity(cmsg::space(
+ std::mem::size_of::<i32>() * HANDLE_QUEUE_LIMIT,
+ )),
+ }
+ }
+}
+
+// Create a connected "pipe" pair. The `Pipe` is the server end,
+// the `PlatformHandle` is the client end to be remoted.
+pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
+ let (server, client) = UnixStream::pair()?;
+ Ok((Pipe::new(server), PlatformHandle::from(client)))
+}
+
+impl Pipe {
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
+ Pipe::new(UnixStream::from_raw_fd(handle.into_raw()))
+ }
+
+ pub fn shutdown(&mut self) -> Result<()> {
+ self.io.shutdown(std::net::Shutdown::Both)
+ }
+}
+
+impl RecvMsg for Pipe {
+ // Receive data (and fds) from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members has been adjusted appropriate by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(buf.buf.remaining_mut() > 0);
+ // TODO: MSG_CMSG_CLOEXEC not portable.
+ // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
+ #[cfg(target_os = "linux")]
+ let flags = libc::MSG_CMSG_CLOEXEC | libc::MSG_NOSIGNAL;
+ #[cfg(not(target_os = "linux"))]
+ let flags = 0;
+ let r = unsafe {
+ let chunk = buf.buf.chunk_mut();
+ let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
+ let mut iovec = [<&mut IoVec>::from(slice)];
+ msg::recv_msg_with_flags(
+ self.io.as_raw_fd(),
+ &mut iovec,
+ self.cmsg.chunk_mut(),
+ flags,
+ )
+ };
+ match r {
+ Ok((n, cmsg_n, msg_flags)) => unsafe {
+ trace!("recv_msg_with_flags flags={}", msg_flags);
+ buf.buf.advance_mut(n);
+ self.cmsg.advance_mut(cmsg_n);
+ let handles = cmsg::decode_handles(&mut self.cmsg);
+ self.cmsg.clear();
+ let unused = 0;
+ for h in handles {
+ buf.push_handle(super::RemoteHandle::new(h, unused));
+ }
+ Ok(n)
+ },
+ Err(e) => Err(e),
+ }
+ }
+}
+
+impl SendMsg for Pipe {
+ // Send data (and fds) on the associated connection. `send_msg` adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(!buf.buf.is_empty());
+ if !buf.handles.is_empty() {
+ let mut handles = [-1i32; HANDLE_QUEUE_LIMIT];
+ for (i, h) in buf.handles.iter().enumerate() {
+ handles[i] = h.handle;
+ }
+ cmsg::encode_handles(&mut self.cmsg, &handles[..buf.handles.len()]);
+ }
+ let r = {
+ // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead.
+ #[cfg(target_os = "linux")]
+ let flags = libc::MSG_NOSIGNAL;
+ #[cfg(not(target_os = "linux"))]
+ let flags = 0;
+ let iovec = [<&IoVec>::from(&buf.buf[..buf.buf.len()])];
+ msg::send_msg_with_flags(self.io.as_raw_fd(), &iovec, &self.cmsg, flags)
+ };
+ match r {
+ Ok(n) => {
+ buf.buf.advance(n);
+ // Discard sent handles.
+ while buf.handles.pop_front().is_some() {}
+ self.cmsg.clear();
+ Ok(n)
+ }
+ Err(e) => Err(e),
+ }
+ }
+}
diff --git a/third_party/rust/audioipc2/src/sys/unix/msg.rs b/third_party/rust/audioipc2/src/sys/unix/msg.rs
new file mode 100644
index 0000000000..a21a893b93
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/unix/msg.rs
@@ -0,0 +1,81 @@
+// Copyright © 2017 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details.
+
+use bytes::buf::UninitSlice;
+use iovec::unix;
+use iovec::IoVec;
+use std::os::unix::io::RawFd;
+use std::{cmp, io, mem, ptr};
+
+fn cvt(r: libc::ssize_t) -> io::Result<usize> {
+ if r == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(r as usize)
+ }
+}
+
+// Convert return of -1 into error message, handling retry on EINTR
+fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> {
+ loop {
+ match cvt(f()) {
+ Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+ other => return other,
+ }
+ }
+}
+
+pub(crate) fn recv_msg_with_flags(
+ socket: RawFd,
+ bufs: &mut [&mut IoVec],
+ cmsg: &mut UninitSlice,
+ flags: libc::c_int,
+) -> io::Result<(usize, usize, libc::c_int)> {
+ let slice = unix::as_os_slice_mut(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.len() == 0 {
+ (ptr::null_mut(), 0)
+ } else {
+ (cmsg.as_mut_ptr() as _, cmsg.len())
+ };
+
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_mut_ptr();
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
+
+ let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?;
+
+ let controllen = msghdr.msg_controllen as usize;
+ Ok((n, controllen, msghdr.msg_flags))
+}
+
+pub(crate) fn send_msg_with_flags(
+ socket: RawFd,
+ bufs: &[&IoVec],
+ cmsg: &[u8],
+ flags: libc::c_int,
+) -> io::Result<usize> {
+ let slice = unix::as_os_slice(bufs);
+ let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len());
+ let (control, controllen) = if cmsg.is_empty() {
+ (ptr::null_mut(), 0)
+ } else {
+ (cmsg.as_ptr() as *mut _, cmsg.len())
+ };
+
+ let mut msghdr: libc::msghdr = unsafe { mem::zeroed() };
+ msghdr.msg_name = ptr::null_mut();
+ msghdr.msg_namelen = 0;
+ msghdr.msg_iov = slice.as_ptr() as *mut _;
+ msghdr.msg_iovlen = len as _;
+ msghdr.msg_control = control;
+ msghdr.msg_controllen = controllen as _;
+
+ cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) })
+}
diff --git a/third_party/rust/audioipc2/src/sys/windows/mod.rs b/third_party/rust/audioipc2/src/sys/windows/mod.rs
new file mode 100644
index 0000000000..6352bbe5a7
--- /dev/null
+++ b/third_party/rust/audioipc2/src/sys/windows/mod.rs
@@ -0,0 +1,102 @@
+// Copyright © 2021 Mozilla Foundation
+//
+// This program is made available under an ISC-style license. See the
+// accompanying file LICENSE for details
+
+use std::{
+ fs::OpenOptions,
+ io::{Read, Write},
+ os::windows::prelude::{FromRawHandle, OpenOptionsExt},
+ sync::atomic::{AtomicUsize, Ordering},
+};
+
+use std::io::Result;
+
+use bytes::{Buf, BufMut};
+use mio::windows::NamedPipe;
+use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
+
+use crate::PlatformHandle;
+
+use super::{ConnectionBuffer, RecvMsg, SendMsg};
+
+pub struct Pipe {
+ pub(crate) io: NamedPipe,
+}
+
+// Create a connected "pipe" pair. The `Pipe` is the server end,
+// the `PlatformHandle` is the client end to be remoted.
+pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> {
+ let pipe_name = get_pipe_name();
+ let server = NamedPipe::new(&pipe_name)?;
+
+ let client = {
+ let mut opts = OpenOptions::new();
+ opts.read(true)
+ .write(true)
+ .custom_flags(FILE_FLAG_OVERLAPPED);
+ let file = opts.open(&pipe_name)?;
+ PlatformHandle::from(file)
+ };
+
+ Ok((Pipe::new(server), client))
+}
+
+static PIPE_ID: AtomicUsize = AtomicUsize::new(0);
+
+fn get_pipe_name() -> String {
+ let pid = std::process::id();
+ let pipe_id = PIPE_ID.fetch_add(1, Ordering::Relaxed);
+ format!("\\\\.\\pipe\\LOCAL\\cubeb-pipe-{}-{}", pid, pipe_id)
+}
+
+impl Pipe {
+ pub fn new(io: NamedPipe) -> Self {
+ Self { io }
+ }
+
+ #[allow(clippy::missing_safety_doc)]
+ pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe {
+ Pipe::new(NamedPipe::from_raw_handle(handle.into_raw()))
+ }
+
+ pub fn shutdown(&mut self) -> Result<()> {
+ self.io.disconnect()
+ }
+}
+
+impl RecvMsg for Pipe {
+ // Receive data from the associated connection. `recv_msg` expects the capacity of
+ // the `ConnectionBuffer` members has been adjusted appropriate by the caller.
+ fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(buf.buf.remaining_mut() > 0);
+ let r = unsafe {
+ let chunk = buf.buf.chunk_mut();
+ let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len());
+ self.io.read(slice)
+ };
+ match r {
+ Ok(n) => unsafe {
+ buf.buf.advance_mut(n);
+ Ok(n)
+ },
+ e => e,
+ }
+ }
+}
+
+impl SendMsg for Pipe {
+ // Send data on the associated connection. `send_msg` adjusts the length of the
+ // `ConnectionBuffer` members based on the size of the successful send operation.
+ fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> {
+ assert!(!buf.buf.is_empty());
+ let r = self.io.write(&buf.buf[..buf.buf.len()]);
+ if let Ok(n) = r {
+ buf.buf.advance(n);
+ while let Some(mut handle) = buf.pop_handle() {
+ handle.mark_sent()
+ }
+ }
+ r
+ }
+}