diff options
Diffstat (limited to 'third_party/rust/audioipc2/src/sys')
-rw-r--r-- | third_party/rust/audioipc2/src/sys/mod.rs | 77 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/cmsg.rs | 104 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/cmsghdr.c | 23 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/mod.rs | 126 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/unix/msg.rs | 82 | ||||
-rw-r--r-- | third_party/rust/audioipc2/src/sys/windows/mod.rs | 102 |
6 files changed, 514 insertions, 0 deletions
diff --git a/third_party/rust/audioipc2/src/sys/mod.rs b/third_party/rust/audioipc2/src/sys/mod.rs new file mode 100644 index 0000000000..0bcfdaa15e --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/mod.rs @@ -0,0 +1,77 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::{collections::VecDeque, io::Result}; + +use bytes::BytesMut; +use mio::{event::Source, Interest, Registry, Token}; + +#[cfg(unix)] +mod unix; +use crate::messages::RemoteHandle; + +#[cfg(unix)] +pub use self::unix::*; + +#[cfg(windows)] +mod windows; +#[cfg(windows)] +pub use self::windows::*; + +impl Source for Pipe { + fn register(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> { + self.io.register(registry, token, interests) + } + + fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) -> Result<()> { + self.io.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> Result<()> { + self.io.deregister(registry) + } +} + +const HANDLE_QUEUE_LIMIT: usize = 16; + +#[derive(Debug)] +pub struct ConnectionBuffer { + pub buf: BytesMut, + handles: VecDeque<RemoteHandle>, +} + +impl ConnectionBuffer { + pub fn with_capacity(cap: usize) -> Self { + ConnectionBuffer { + buf: BytesMut::with_capacity(cap), + handles: VecDeque::with_capacity(HANDLE_QUEUE_LIMIT), + } + } + + pub fn is_empty(&self) -> bool { + self.buf.is_empty() + } + + pub fn push_handle(&mut self, handle: RemoteHandle) { + assert!(self.handles.len() < self.handles.capacity()); + self.handles.push_back(handle) + } + + pub fn pop_handle(&mut self) -> Option<RemoteHandle> { + self.handles.pop_front() + } +} + +pub trait RecvMsg { + // Receive data from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members have been adjusted appropriately by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>; +} + +pub trait SendMsg { + // Send data on the associated connection. `send_msg` consumes and adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize>; +} diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsg.rs b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs new file mode 100644 index 0000000000..581df9a847 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/cmsg.rs @@ -0,0 +1,104 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use crate::sys::HANDLE_QUEUE_LIMIT; +use bytes::{BufMut, BytesMut}; +use libc::{self, cmsghdr}; +use std::convert::TryInto; +use std::os::unix::io::RawFd; +use std::{mem, slice}; + +trait AsBytes { + fn as_bytes(&self) -> &[u8]; +} + +impl<'a, T: Sized> AsBytes for &'a [T] { + fn as_bytes(&self) -> &[u8] { + // TODO: This should account for the alignment of T + let byte_count = self.len() * mem::size_of::<T>(); + unsafe { slice::from_raw_parts(self.as_ptr() as *const _, byte_count) } + } +} + +// Encode `handles` into a cmsghdr in `buf`. +pub fn encode_handles(cmsg: &mut BytesMut, handles: &[RawFd]) { + assert!(handles.len() <= HANDLE_QUEUE_LIMIT); + let msg = handles.as_bytes(); + + let cmsg_space = space(msg.len()); + assert!(cmsg.remaining_mut() >= cmsg_space); + + // Some definitions of cmsghdr contain padding. Rather + // than try to keep an up-to-date #cfg list to handle + // that, just use a pre-zeroed struct to fill out any + // fields we don't care about. + let zeroed = unsafe { mem::zeroed() }; + #[allow(clippy::needless_update)] + // `cmsg_len` is `usize` on some platforms, `u32` on others. + #[allow(clippy::useless_conversion)] + let cmsghdr = cmsghdr { + cmsg_len: len(msg.len()).try_into().unwrap(), + cmsg_level: libc::SOL_SOCKET, + cmsg_type: libc::SCM_RIGHTS, + ..zeroed + }; + + unsafe { + let cmsghdr_ptr = cmsg.chunk_mut().as_mut_ptr(); + std::ptr::copy_nonoverlapping( + &cmsghdr as *const _ as *const _, + cmsghdr_ptr, + mem::size_of::<cmsghdr>(), + ); + let cmsg_data_ptr = libc::CMSG_DATA(cmsghdr_ptr as _); + std::ptr::copy_nonoverlapping(msg.as_ptr(), cmsg_data_ptr, msg.len()); + cmsg.advance_mut(cmsg_space); + } +} + +// Decode `buf` containing a cmsghdr with one or more handle(s). +pub fn decode_handles(buf: &mut BytesMut) -> arrayvec::ArrayVec<RawFd, HANDLE_QUEUE_LIMIT> { + let mut fds = arrayvec::ArrayVec::<RawFd, HANDLE_QUEUE_LIMIT>::new(); + + let cmsghdr_len = len(0); + + if buf.len() < cmsghdr_len { + // No more entries---not enough data in `buf` for a + // complete message. + return fds; + } + + let cmsg: &cmsghdr = unsafe { &*(buf.as_ptr() as *const _) }; + #[allow(clippy::unnecessary_cast)] // `cmsg_len` type is platform-dependent. + let cmsg_len = cmsg.cmsg_len as usize; + + match (cmsg.cmsg_level, cmsg.cmsg_type) { + (libc::SOL_SOCKET, libc::SCM_RIGHTS) => { + trace!("Found SCM_RIGHTS..."); + let slice = &buf[cmsghdr_len..cmsg_len]; + let slice = unsafe { + slice::from_raw_parts( + slice.as_ptr() as *const _, + slice.len() / mem::size_of::<i32>(), + ) + }; + fds.try_extend_from_slice(slice).unwrap(); + } + (level, kind) => { + trace!("Skipping cmsg level, {}, type={}...", level, kind); + } + } + + assert!(fds.len() <= HANDLE_QUEUE_LIMIT); + fds +} + +fn len(len: usize) -> usize { + unsafe { libc::CMSG_LEN(len.try_into().unwrap()) as usize } +} + +pub fn space(len: usize) -> usize { + unsafe { libc::CMSG_SPACE(len.try_into().unwrap()) as usize } +} diff --git a/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c new file mode 100644 index 0000000000..82d7852867 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/cmsghdr.c @@ -0,0 +1,23 @@ +#include <sys/socket.h> +#include <inttypes.h> +#include <string.h> + +const uint8_t* +cmsghdr_bytes(size_t* size) +{ + int myfd = 0; + + static union { + uint8_t buf[CMSG_SPACE(sizeof(myfd))]; + struct cmsghdr align; + } u; + + u.align.cmsg_len = CMSG_LEN(sizeof(myfd)); + u.align.cmsg_level = SOL_SOCKET; + u.align.cmsg_type = SCM_RIGHTS; + + memcpy(CMSG_DATA(&u.align), &myfd, sizeof(myfd)); + + *size = sizeof(u); + return (const uint8_t*)&u.buf; +} diff --git a/third_party/rust/audioipc2/src/sys/unix/mod.rs b/third_party/rust/audioipc2/src/sys/unix/mod.rs new file mode 100644 index 0000000000..84f3f1edf2 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/mod.rs @@ -0,0 +1,126 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::io::Result; +use std::os::unix::prelude::{AsRawFd, FromRawFd}; + +use bytes::{Buf, BufMut, BytesMut}; +use iovec::IoVec; +use mio::net::UnixStream; + +use crate::PlatformHandle; + +use super::{ConnectionBuffer, RecvMsg, SendMsg, HANDLE_QUEUE_LIMIT}; + +pub mod cmsg; +mod msg; + +pub struct Pipe { + pub(crate) io: UnixStream, + cmsg: BytesMut, +} + +impl Pipe { + fn new(io: UnixStream) -> Self { + Pipe { + io, + cmsg: BytesMut::with_capacity(cmsg::space( + std::mem::size_of::<i32>() * HANDLE_QUEUE_LIMIT, + )), + } + } +} + +// Create a connected "pipe" pair. The `Pipe` is the server end, +// the `PlatformHandle` is the client end to be remoted. +pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> { + let (server, client) = UnixStream::pair()?; + Ok((Pipe::new(server), PlatformHandle::from(client))) +} + +impl Pipe { + #[allow(clippy::missing_safety_doc)] + pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe { + Pipe::new(UnixStream::from_raw_fd(handle.into_raw())) + } + + pub fn shutdown(&mut self) -> Result<()> { + self.io.shutdown(std::net::Shutdown::Both) + } +} + +impl RecvMsg for Pipe { + // Receive data (and fds) from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members has been adjusted appropriate by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(buf.buf.remaining_mut() > 0); + // TODO: MSG_CMSG_CLOEXEC not portable. + // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead. + #[cfg(target_os = "linux")] + let flags = libc::MSG_CMSG_CLOEXEC | libc::MSG_NOSIGNAL; + #[cfg(not(target_os = "linux"))] + let flags = 0; + let r = unsafe { + let chunk = buf.buf.chunk_mut(); + let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len()); + let mut iovec = [<&mut IoVec>::from(slice)]; + msg::recv_msg_with_flags( + self.io.as_raw_fd(), + &mut iovec, + self.cmsg.chunk_mut(), + flags, + ) + }; + match r { + Ok((n, cmsg_n, msg_flags)) => unsafe { + trace!("recv_msg_with_flags flags={}", msg_flags); + buf.buf.advance_mut(n); + self.cmsg.advance_mut(cmsg_n); + let handles = cmsg::decode_handles(&mut self.cmsg); + self.cmsg.clear(); + let unused = 0; + for h in handles { + buf.push_handle(super::RemoteHandle::new(h, unused)); + } + Ok(n) + }, + Err(e) => Err(e), + } + } +} + +impl SendMsg for Pipe { + // Send data (and fds) on the associated connection. `send_msg` adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(!buf.buf.is_empty()); + if !buf.handles.is_empty() { + let mut handles = [-1i32; HANDLE_QUEUE_LIMIT]; + for (i, h) in buf.handles.iter().enumerate() { + handles[i] = h.handle; + } + cmsg::encode_handles(&mut self.cmsg, &handles[..buf.handles.len()]); + } + let r = { + // TODO: MSG_NOSIGNAL not portable; macOS can set socket option SO_NOSIGPIPE instead. + #[cfg(target_os = "linux")] + let flags = libc::MSG_NOSIGNAL; + #[cfg(not(target_os = "linux"))] + let flags = 0; + let iovec = [<&IoVec>::from(&buf.buf[..buf.buf.len()])]; + msg::send_msg_with_flags(self.io.as_raw_fd(), &iovec, &self.cmsg, flags) + }; + match r { + Ok(n) => { + buf.buf.advance(n); + // Discard sent handles. + while buf.handles.pop_front().is_some() {} + self.cmsg.clear(); + Ok(n) + } + Err(e) => Err(e), + } + } +} diff --git a/third_party/rust/audioipc2/src/sys/unix/msg.rs b/third_party/rust/audioipc2/src/sys/unix/msg.rs new file mode 100644 index 0000000000..c2cd353289 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/unix/msg.rs @@ -0,0 +1,82 @@ +// Copyright © 2017 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details. + +use bytes::buf::UninitSlice; +use iovec::unix; +use iovec::IoVec; +use std::os::unix::io::RawFd; +use std::{cmp, io, mem, ptr}; + +fn cvt(r: libc::ssize_t) -> io::Result<usize> { + if r == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(r as usize) + } +} + +// Convert return of -1 into error message, handling retry on EINTR +fn cvt_r<F: FnMut() -> libc::ssize_t>(mut f: F) -> io::Result<usize> { + loop { + match cvt(f()) { + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + other => return other, + } + } +} + +pub(crate) fn recv_msg_with_flags( + socket: RawFd, + bufs: &mut [&mut IoVec], + cmsg: &mut UninitSlice, + flags: libc::c_int, +) -> io::Result<(usize, usize, libc::c_int)> { + let slice = unix::as_os_slice_mut(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let (control, controllen) = if cmsg.len() == 0 { + (ptr::null_mut(), 0) + } else { + (cmsg.as_mut_ptr() as _, cmsg.len()) + }; + + let mut msghdr: libc::msghdr = unsafe { mem::zeroed() }; + msghdr.msg_name = ptr::null_mut(); + msghdr.msg_namelen = 0; + msghdr.msg_iov = slice.as_mut_ptr(); + msghdr.msg_iovlen = len as _; + msghdr.msg_control = control; + msghdr.msg_controllen = controllen as _; + + let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?; + + #[allow(clippy::unnecessary_cast)] // `msg_controllen` type is platform-dependent. + let controllen = msghdr.msg_controllen as usize; + Ok((n, controllen, msghdr.msg_flags)) +} + +pub(crate) fn send_msg_with_flags( + socket: RawFd, + bufs: &[&IoVec], + cmsg: &[u8], + flags: libc::c_int, +) -> io::Result<usize> { + let slice = unix::as_os_slice(bufs); + let len = cmp::min(<libc::c_int>::max_value() as usize, slice.len()); + let (control, controllen) = if cmsg.is_empty() { + (ptr::null_mut(), 0) + } else { + (cmsg.as_ptr() as *mut _, cmsg.len()) + }; + + let mut msghdr: libc::msghdr = unsafe { mem::zeroed() }; + msghdr.msg_name = ptr::null_mut(); + msghdr.msg_namelen = 0; + msghdr.msg_iov = slice.as_ptr() as *mut _; + msghdr.msg_iovlen = len as _; + msghdr.msg_control = control; + msghdr.msg_controllen = controllen as _; + + cvt_r(|| unsafe { libc::sendmsg(socket, &msghdr as *const _, flags) }) +} diff --git a/third_party/rust/audioipc2/src/sys/windows/mod.rs b/third_party/rust/audioipc2/src/sys/windows/mod.rs new file mode 100644 index 0000000000..973e9608d1 --- /dev/null +++ b/third_party/rust/audioipc2/src/sys/windows/mod.rs @@ -0,0 +1,102 @@ +// Copyright © 2021 Mozilla Foundation +// +// This program is made available under an ISC-style license. See the +// accompanying file LICENSE for details + +use std::{ + fs::OpenOptions, + io::{Read, Write}, + os::windows::prelude::{FromRawHandle, OpenOptionsExt}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use std::io::Result; + +use bytes::{Buf, BufMut}; +use mio::windows::NamedPipe; +use winapi::um::winbase::FILE_FLAG_OVERLAPPED; + +use crate::PlatformHandle; + +use super::{ConnectionBuffer, RecvMsg, SendMsg}; + +pub struct Pipe { + pub(crate) io: NamedPipe, +} + +// Create a connected "pipe" pair. The `Pipe` is the server end, +// the `PlatformHandle` is the client end to be remoted. +pub fn make_pipe_pair() -> Result<(Pipe, PlatformHandle)> { + let pipe_name = get_pipe_name(); + let server = NamedPipe::new(&pipe_name)?; + + let client = { + let mut opts = OpenOptions::new(); + opts.read(true) + .write(true) + .custom_flags(FILE_FLAG_OVERLAPPED); + let file = opts.open(&pipe_name)?; + PlatformHandle::from(file) + }; + + Ok((Pipe::new(server), client)) +} + +static PIPE_ID: AtomicUsize = AtomicUsize::new(0); + +fn get_pipe_name() -> String { + let pid = std::process::id(); + let pipe_id = PIPE_ID.fetch_add(1, Ordering::Relaxed); + format!("\\\\.\\pipe\\LOCAL\\cubeb-pipe-{pid}-{pipe_id}") +} + +impl Pipe { + pub fn new(io: NamedPipe) -> Self { + Self { io } + } + + #[allow(clippy::missing_safety_doc)] + pub unsafe fn from_raw_handle(handle: crate::PlatformHandle) -> Pipe { + Pipe::new(NamedPipe::from_raw_handle(handle.into_raw())) + } + + pub fn shutdown(&mut self) -> Result<()> { + self.io.disconnect() + } +} + +impl RecvMsg for Pipe { + // Receive data from the associated connection. `recv_msg` expects the capacity of + // the `ConnectionBuffer` members has been adjusted appropriate by the caller. + fn recv_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(buf.buf.remaining_mut() > 0); + let r = unsafe { + let chunk = buf.buf.chunk_mut(); + let slice = std::slice::from_raw_parts_mut(chunk.as_mut_ptr(), chunk.len()); + self.io.read(slice) + }; + match r { + Ok(n) => unsafe { + buf.buf.advance_mut(n); + Ok(n) + }, + e => e, + } + } +} + +impl SendMsg for Pipe { + // Send data on the associated connection. `send_msg` adjusts the length of the + // `ConnectionBuffer` members based on the size of the successful send operation. + fn send_msg(&mut self, buf: &mut ConnectionBuffer) -> Result<usize> { + assert!(!buf.buf.is_empty()); + let r = self.io.write(&buf.buf[..buf.buf.len()]); + if let Ok(n) = r { + buf.buf.advance(n); + while let Some(mut handle) = buf.pop_handle() { + handle.mark_sent() + } + } + r + } +} |