diff options
Diffstat (limited to 'third_party/rust/mio/src/sys/unix')
-rw-r--r-- | third_party/rust/mio/src/sys/unix/mod.rs | 72 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/net.rs | 168 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/pipe.rs | 431 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/selector/epoll.rs | 246 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/selector/kqueue.rs | 688 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/selector/mod.rs | 35 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/sourcefd.rs | 116 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/tcp.rs | 113 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/udp.rs | 39 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds/datagram.rs | 56 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds/listener.rs | 94 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds/mod.rs | 149 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds/socketaddr.rs | 130 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/uds/stream.rs | 39 | ||||
-rw-r--r-- | third_party/rust/mio/src/sys/unix/waker.rs | 178 |
15 files changed, 2554 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/sys/unix/mod.rs b/third_party/rust/mio/src/sys/unix/mod.rs new file mode 100644 index 0000000000..231480a5da --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/mod.rs @@ -0,0 +1,72 @@ +/// Helper macro to execute a system call that returns an `io::Result`. +// +// Macro must be defined before any modules that uses them. +#[allow(unused_macros)] +macro_rules! syscall { + ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + let res = unsafe { libc::$fn($($arg, )*) }; + if res == -1 { + Err(std::io::Error::last_os_error()) + } else { + Ok(res) + } + }}; +} + +cfg_os_poll! { + mod selector; + pub(crate) use self::selector::{event, Event, Events, Selector}; + + mod sourcefd; + pub use self::sourcefd::SourceFd; + + mod waker; + pub(crate) use self::waker::Waker; + + cfg_net! { + mod net; + + pub(crate) mod tcp; + pub(crate) mod udp; + pub(crate) mod uds; + pub use self::uds::SocketAddr; + } + + cfg_io_source! { + use std::io; + + // Both `kqueue` and `epoll` don't need to hold any user space state. + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io<T, F, R>(&self, f: F, io: &T) -> io::Result<R> + where + F: FnOnce(&T) -> io::Result<R>, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } + } + } + + cfg_os_ext! { + pub(crate) mod pipe; + } +} + +cfg_not_os_poll! { + cfg_net! { + mod uds; + pub use self::uds::SocketAddr; + } + + cfg_any_os_ext! { + mod sourcefd; + pub use self::sourcefd::SourceFd; + } +} diff --git a/third_party/rust/mio/src/sys/unix/net.rs b/third_party/rust/mio/src/sys/unix/net.rs new file mode 100644 index 0000000000..78f1387b1f --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/net.rs @@ -0,0 +1,168 @@ +use std::io; +use std::mem::size_of; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; + +pub(crate) fn new_ip_socket(addr: SocketAddr, socket_type: libc::c_int) -> io::Result<libc::c_int> { + let domain = match addr { + SocketAddr::V4(..) => libc::AF_INET, + SocketAddr::V6(..) => libc::AF_INET6, + }; + + new_socket(domain, socket_type) +} + +/// Create a new non-blocking socket. +pub(crate) fn new_socket(domain: libc::c_int, socket_type: libc::c_int) -> io::Result<libc::c_int> { + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "illumos", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd" + ))] + let socket_type = socket_type | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; + + // Gives a warning for platforms without SOCK_NONBLOCK. + #[allow(clippy::let_and_return)] + let socket = syscall!(socket(domain, socket_type, 0)); + + // Mimick `libstd` and set `SO_NOSIGPIPE` on apple systems. + #[cfg(target_vendor = "apple")] + let socket = socket.and_then(|socket| { + syscall!(setsockopt( + socket, + libc::SOL_SOCKET, + libc::SO_NOSIGPIPE, + &1 as *const libc::c_int as *const libc::c_void, + size_of::<libc::c_int>() as libc::socklen_t + )) + .map(|_| socket) + }); + + // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. + #[cfg(any(target_os = "ios", target_os = "macos"))] + let socket = socket.and_then(|socket| { + // For platforms that don't support flags in socket, we need to + // set the flags ourselves. + syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) + .and_then(|_| syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket)) + .map_err(|e| { + // If either of the `fcntl` calls failed, ensure the socket is + // closed and return the error. + let _ = syscall!(close(socket)); + e + }) + }); + + socket +} + +/// A type with the same memory layout as `libc::sockaddr`. Used in converting Rust level +/// SocketAddr* types into their system representation. The benefit of this specific +/// type over using `libc::sockaddr_storage` is that this type is exactly as large as it +/// needs to be and not a lot larger. And it can be initialized cleaner from Rust. +#[repr(C)] +pub(crate) union SocketAddrCRepr { + v4: libc::sockaddr_in, + v6: libc::sockaddr_in6, +} + +impl SocketAddrCRepr { + pub(crate) fn as_ptr(&self) -> *const libc::sockaddr { + self as *const _ as *const libc::sockaddr + } +} + +/// Converts a Rust `SocketAddr` into the system representation. +pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_t) { + match addr { + SocketAddr::V4(ref addr) => { + // `s_addr` is stored as BE on all machine and the array is in BE order. + // So the native endian conversion method is used so that it's never swapped. + let sin_addr = libc::in_addr { + s_addr: u32::from_ne_bytes(addr.ip().octets()), + }; + + let sockaddr_in = libc::sockaddr_in { + sin_family: libc::AF_INET as libc::sa_family_t, + sin_port: addr.port().to_be(), + sin_addr, + sin_zero: [0; 8], + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + sin_len: 0, + }; + + let sockaddr = SocketAddrCRepr { v4: sockaddr_in }; + let socklen = size_of::<libc::sockaddr_in>() as libc::socklen_t; + (sockaddr, socklen) + } + SocketAddr::V6(ref addr) => { + let sockaddr_in6 = libc::sockaddr_in6 { + sin6_family: libc::AF_INET6 as libc::sa_family_t, + sin6_port: addr.port().to_be(), + sin6_addr: libc::in6_addr { + s6_addr: addr.ip().octets(), + }, + sin6_flowinfo: addr.flowinfo(), + sin6_scope_id: addr.scope_id(), + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" + ))] + sin6_len: 0, + #[cfg(target_os = "illumos")] + __sin6_src_id: 0, + }; + + let sockaddr = SocketAddrCRepr { v6: sockaddr_in6 }; + let socklen = size_of::<libc::sockaddr_in6>() as libc::socklen_t; + (sockaddr, socklen) + } + } +} + +/// Converts a `libc::sockaddr` compatible struct into a native Rust `SocketAddr`. +/// +/// # Safety +/// +/// `storage` must have the `ss_family` field correctly initialized. +/// `storage` must be initialised to a `sockaddr_in` or `sockaddr_in6`. +pub(crate) unsafe fn to_socket_addr( + storage: *const libc::sockaddr_storage, +) -> io::Result<SocketAddr> { + match (*storage).ss_family as libc::c_int { + libc::AF_INET => { + // Safety: if the ss_family field is AF_INET then storage must be a sockaddr_in. + let addr: &libc::sockaddr_in = &*(storage as *const libc::sockaddr_in); + let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + let port = u16::from_be(addr.sin_port); + Ok(SocketAddr::V4(SocketAddrV4::new(ip, port))) + } + libc::AF_INET6 => { + // Safety: if the ss_family field is AF_INET6 then storage must be a sockaddr_in6. + let addr: &libc::sockaddr_in6 = &*(storage as *const libc::sockaddr_in6); + let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + let port = u16::from_be(addr.sin6_port); + Ok(SocketAddr::V6(SocketAddrV6::new( + ip, + port, + addr.sin6_flowinfo, + addr.sin6_scope_id, + ))) + } + _ => Err(io::ErrorKind::InvalidInput.into()), + } +} diff --git a/third_party/rust/mio/src/sys/unix/pipe.rs b/third_party/rust/mio/src/sys/unix/pipe.rs new file mode 100644 index 0000000000..c899dfb2da --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/pipe.rs @@ -0,0 +1,431 @@ +//! Unix pipe. +//! +//! See the [`new`] function for documentation. + +use std::fs::File; +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::process::{ChildStderr, ChildStdin, ChildStdout}; + +use crate::io_source::IoSource; +use crate::{event, Interest, Registry, Token}; + +/// Create a new non-blocking Unix pipe. +/// +/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as +/// inter-process or thread communication channel. +/// +/// This channel may be created before forking the process and then one end used +/// in each process, e.g. the parent process has the sending end to send command +/// to the child process. +/// +/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html +/// +/// # Events +/// +/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive +/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is +/// written to the `Sender` the `Receiver` will receive an [readable event]. +/// +/// In addition to those events, events will also be generated if the other side +/// is dropped. To check if the `Sender` is dropped you'll need to check +/// [`is_read_closed`] on events for the `Receiver`, if it returns true the +/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it +/// returns true the `Receiver` was dropped. Also see the second example below. +/// +/// [`WRITABLE`]: Interest::WRITABLE +/// [writable events]: event::Event::is_writable +/// [`READABLE`]: Interest::READABLE +/// [readable event]: event::Event::is_readable +/// [`is_read_closed`]: event::Event::is_read_closed +/// [`is_write_closed`]: event::Event::is_write_closed +/// +/// # Deregistering +/// +/// Both `Sender` and `Receiver` will deregister themselves when dropped, +/// **iff** the file descriptors are not duplicated (via [`dup(2)`]). +/// +/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html +/// +/// # Examples +/// +/// Simple example that writes data into the sending end and read it from the +/// receiving end. +/// +/// ``` +/// use std::io::{self, Read, Write}; +/// +/// use mio::{Poll, Events, Interest, Token}; +/// use mio::unix::pipe; +/// +/// // Unique tokens for the two ends of the channel. +/// const PIPE_RECV: Token = Token(0); +/// const PIPE_SEND: Token = Token(1); +/// +/// # fn main() -> io::Result<()> { +/// // Create our `Poll` instance and the `Events` container. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// // Create a new pipe. +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// // Register both ends of the channel. +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// const MSG: &[u8; 11] = b"Hello world"; +/// +/// loop { +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_SEND => sender.write(MSG) +/// .and_then(|n| if n != MSG.len() { +/// // We'll consider a short write an error in this +/// // example. NOTE: we can't use `write_all` with +/// // non-blocking I/O. +/// Err(io::ErrorKind::WriteZero.into()) +/// } else { +/// Ok(()) +/// })?, +/// PIPE_RECV => { +/// let mut buf = [0; 11]; +/// let n = receiver.read(&mut buf)?; +/// println!("received: {:?}", &buf[0..n]); +/// assert_eq!(n, MSG.len()); +/// assert_eq!(&buf, &*MSG); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// } +/// # } +/// ``` +/// +/// Example that receives an event once the `Sender` is dropped. +/// +/// ``` +/// # use std::io; +/// # +/// # use mio::{Poll, Events, Interest, Token}; +/// # use mio::unix::pipe; +/// # +/// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); +/// # +/// # fn main() -> io::Result<()> { +/// // Same setup as in the example above. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// // Drop the sender. +/// drop(sender); +/// +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_RECV if event.is_read_closed() => { +/// // Detected that the sender was dropped. +/// println!("Sender dropped!"); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// # unreachable!(); +/// # } +/// ``` +pub fn new() -> io::Result<(Sender, Receiver)> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "illumos", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any(target_os = "ios", target_os = "macos"))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "ios", + target_os = "macos", + target_os = "illumos", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + // Safety: we just initialised the `fds` above. + let r = unsafe { Receiver::from_raw_fd(fds[0]) }; + let w = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((w, r)) +} + +/// Sending end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Sender { + inner: IoSource<File>, +} + +impl Sender { + /// Set the `Sender` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Sender { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +impl Write for &Sender { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdin> for Sender { + fn from(stdin: ChildStdin) -> Sender { + // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. + unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } + } +} + +impl FromRawFd for Sender { + unsafe fn from_raw_fd(fd: RawFd) -> Sender { + Sender { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Sender { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +/// Receiving end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Receiver { + inner: IoSource<File>, +} + +impl Receiver { + /// Set the `Receiver` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +impl Read for &Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdout> for Receiver { + fn from(stdout: ChildStdout) -> Receiver { + // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStderr> for Receiver { + fn from(stderr: ChildStderr) -> Receiver { + // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } + } +} + +impl FromRawFd for Receiver { + unsafe fn from_raw_fd(fd: RawFd) -> Receiver { + Receiver { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Receiver { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +#[cfg(not(target_os = "illumos"))] +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let value = nonblocking as libc::c_int; + if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +#[cfg(target_os = "illumos")] +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + if flags < 0 { + return Err(io::Error::last_os_error()); + } + + let nflags = if nonblocking { + flags | libc::O_NONBLOCK + } else { + flags & !libc::O_NONBLOCK + }; + + if flags != nflags { + if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} diff --git a/third_party/rust/mio/src/sys/unix/selector/epoll.rs b/third_party/rust/mio/src/sys/unix/selector/epoll.rs new file mode 100644 index 0000000000..f4430909b0 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/selector/epoll.rs @@ -0,0 +1,246 @@ +use crate::{Interest, Token}; + +use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; +use log::error; +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(debug_assertions)] +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::time::Duration; +use std::{cmp, i32, io, ptr}; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +pub struct Selector { + #[cfg(debug_assertions)] + id: usize, + ep: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result<Selector> { + // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < + // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, + // so we use it instead. + #[cfg(target_os = "android")] + let flag = libc::O_CLOEXEC; + #[cfg(not(target_os = "android"))] + let flag = libc::EPOLL_CLOEXEC; + + syscall!(epoll_create1(flag)).map(|ep| Selector { + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result<Selector> { + syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector { + // It's the same selector, so we use the same id. + #[cfg(debug_assertions)] + id: self.id, + ep, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { + // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ + // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits + // architectures. The magic number is the same constant used by libuv. + #[cfg(target_pointer_width = "32")] + const MAX_SAFE_TIMEOUT: u128 = 1789569; + #[cfg(not(target_pointer_width = "32"))] + const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128; + + let timeout = timeout + .map(|to| cmp::min(to.as_millis(), MAX_SAFE_TIMEOUT) as libc::c_int) + .unwrap_or(-1); + + events.clear(); + syscall!(epoll_wait( + self.ep, + events.as_mut_ptr(), + events.capacity() as i32, + timeout, + )) + .map(|n_events| { + // This is safe because `epoll_wait` ensures that `n_events` are + // assigned. + unsafe { events.set_len(n_events as usize) }; + }) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let mut event = libc::epoll_event { + events: interests_to_epoll(interests), + u64: usize::from(token) as u64, + }; + + syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ()) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let mut event = libc::epoll_event { + events: interests_to_epoll(interests), + u64: usize::from(token) as u64, + }; + + syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ()) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ()) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.id + } + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + self.ep + } +} + +impl Drop for Selector { + fn drop(&mut self) { + if let Err(err) = syscall!(close(self.ep)) { + error!("error closing epoll: {}", err); + } + } +} + +fn interests_to_epoll(interests: Interest) -> u32 { + let mut kind = EPOLLET; + + if interests.is_readable() { + kind = kind | EPOLLIN | EPOLLRDHUP; + } + + if interests.is_writable() { + kind |= EPOLLOUT; + } + + kind as u32 +} + +pub type Event = libc::epoll_event; +pub type Events = Vec<Event>; + +pub mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + pub fn token(event: &Event) -> Token { + Token(event.u64 as usize) + } + + pub fn is_readable(event: &Event) -> bool { + (event.events as libc::c_int & libc::EPOLLIN) != 0 + || (event.events as libc::c_int & libc::EPOLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events as libc::c_int & libc::EPOLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events as libc::c_int & libc::EPOLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + event.events as libc::c_int & libc::EPOLLHUP != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || (event.events as libc::c_int & libc::EPOLLIN != 0 + && event.events as libc::c_int & libc::EPOLLRDHUP != 0) + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + event.events as libc::c_int & libc::EPOLLHUP != 0 + // Unix pipe write end has closed + || (event.events as libc::c_int & libc::EPOLLOUT != 0 + && event.events as libc::c_int & libc::EPOLLERR != 0) + // The other side (read end) of a Unix pipe has closed. + || event.events as libc::c_int == libc::EPOLLERR + } + + pub fn is_priority(event: &Event) -> bool { + (event.events as libc::c_int & libc::EPOLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &u32, want: &libc::c_int) -> bool { + (*got as libc::c_int & want) != 0 + } + debug_detail!( + EventsDetails(u32), + check_events, + libc::EPOLLIN, + libc::EPOLLPRI, + libc::EPOLLOUT, + libc::EPOLLRDNORM, + libc::EPOLLRDBAND, + libc::EPOLLWRNORM, + libc::EPOLLWRBAND, + libc::EPOLLMSG, + libc::EPOLLERR, + libc::EPOLLHUP, + libc::EPOLLET, + libc::EPOLLRDHUP, + libc::EPOLLONESHOT, + #[cfg(target_os = "linux")] + libc::EPOLLEXCLUSIVE, + #[cfg(any(target_os = "android", target_os = "linux"))] + libc::EPOLLWAKEUP, + libc::EPOLL_CLOEXEC, + ); + + // Can't reference fields in packed structures. + let e_u64 = event.u64; + f.debug_struct("epoll_event") + .field("events", &EventsDetails(event.events)) + .field("u64", &e_u64) + .finish() + } +} + +#[cfg(target_os = "android")] +#[test] +fn assert_close_on_exec_flag() { + // This assertion need to be true for Selector::new. + assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC); +} diff --git a/third_party/rust/mio/src/sys/unix/selector/kqueue.rs b/third_party/rust/mio/src/sys/unix/selector/kqueue.rs new file mode 100644 index 0000000000..b7a01a51c6 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/selector/kqueue.rs @@ -0,0 +1,688 @@ +use crate::{Interest, Token}; +use log::error; +use std::mem::MaybeUninit; +use std::ops::{Deref, DerefMut}; +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(debug_assertions)] +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::time::Duration; +use std::{cmp, io, ptr, slice}; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +// Type of the `nchanges` and `nevents` parameters in the `kevent` function. +#[cfg(not(target_os = "netbsd"))] +type Count = libc::c_int; +#[cfg(target_os = "netbsd")] +type Count = libc::size_t; + +// Type of the `filter` field in the `kevent` structure. +#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))] +type Filter = libc::c_short; +#[cfg(any(target_os = "macos", target_os = "ios"))] +type Filter = i16; +#[cfg(target_os = "netbsd")] +type Filter = u32; + +// Type of the `flags` field in the `kevent` structure. +#[cfg(any(target_os = "dragonfly", target_os = "freebsd", target_os = "openbsd"))] +type Flags = libc::c_ushort; +#[cfg(any(target_os = "macos", target_os = "ios"))] +type Flags = u16; +#[cfg(target_os = "netbsd")] +type Flags = u32; + +// Type of the `data` field in the `kevent` structure. +#[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" +))] +type Data = libc::intptr_t; +#[cfg(any(target_os = "netbsd", target_os = "openbsd"))] +type Data = i64; + +// Type of the `udata` field in the `kevent` structure. +#[cfg(not(target_os = "netbsd"))] +type UData = *mut libc::c_void; +#[cfg(target_os = "netbsd")] +type UData = libc::intptr_t; + +macro_rules! kevent { + ($id: expr, $filter: expr, $flags: expr, $data: expr) => { + libc::kevent { + ident: $id as libc::uintptr_t, + filter: $filter as Filter, + flags: $flags, + fflags: 0, + data: 0, + udata: $data as UData, + } + }; +} + +#[derive(Debug)] +pub struct Selector { + #[cfg(debug_assertions)] + id: usize, + kq: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result<Selector> { + syscall!(kqueue()) + .and_then(|kq| syscall!(fcntl(kq, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| kq)) + .map(|kq| Selector { + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result<Selector> { + syscall!(fcntl(self.kq, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|kq| Selector { + // It's the same selector, so we use the same id. + #[cfg(debug_assertions)] + id: self.id, + kq, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> { + let timeout = timeout.map(|to| libc::timespec { + tv_sec: cmp::min(to.as_secs(), libc::time_t::max_value() as u64) as libc::time_t, + // `Duration::subsec_nanos` is guaranteed to be less than one + // billion (the number of nanoseconds in a second), making the + // cast to i32 safe. The cast itself is needed for platforms + // where C's long is only 32 bits. + tv_nsec: libc::c_long::from(to.subsec_nanos() as i32), + }); + let timeout = timeout + .as_ref() + .map(|s| s as *const _) + .unwrap_or(ptr::null_mut()); + + events.clear(); + syscall!(kevent( + self.kq, + ptr::null(), + 0, + events.as_mut_ptr(), + events.capacity() as Count, + timeout, + )) + .map(|n_events| { + // This is safe because `kevent` ensures that `n_events` are + // assigned. + unsafe { events.set_len(n_events as usize) }; + }) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let flags = libc::EV_CLEAR | libc::EV_RECEIPT | libc::EV_ADD; + // At most we need two changes, but maybe we only need 1. + let mut changes: [MaybeUninit<libc::kevent>; 2] = + [MaybeUninit::uninit(), MaybeUninit::uninit()]; + let mut n_changes = 0; + + if interests.is_writable() { + let kevent = kevent!(fd, libc::EVFILT_WRITE, flags, token.0); + changes[n_changes] = MaybeUninit::new(kevent); + n_changes += 1; + } + + if interests.is_readable() { + let kevent = kevent!(fd, libc::EVFILT_READ, flags, token.0); + changes[n_changes] = MaybeUninit::new(kevent); + n_changes += 1; + } + + // Older versions of macOS (OS X 10.11 and 10.10 have been witnessed) + // can return EPIPE when registering a pipe file descriptor where the + // other end has already disappeared. For example code that creates a + // pipe, closes a file descriptor, and then registers the other end will + // see an EPIPE returned from `register`. + // + // It also turns out that kevent will still report events on the file + // descriptor, telling us that it's readable/hup at least after we've + // done this registration. As a result we just ignore `EPIPE` here + // instead of propagating it. + // + // More info can be found at tokio-rs/mio#582. + let changes = unsafe { + // This is safe because we ensure that at least `n_changes` are in + // the array. + slice::from_raw_parts_mut(changes[0].as_mut_ptr(), n_changes) + }; + kevent_register(self.kq, changes, &[libc::EPIPE as Data]) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let flags = libc::EV_CLEAR | libc::EV_RECEIPT; + let write_flags = if interests.is_writable() { + flags | libc::EV_ADD + } else { + flags | libc::EV_DELETE + }; + let read_flags = if interests.is_readable() { + flags | libc::EV_ADD + } else { + flags | libc::EV_DELETE + }; + + let mut changes: [libc::kevent; 2] = [ + kevent!(fd, libc::EVFILT_WRITE, write_flags, token.0), + kevent!(fd, libc::EVFILT_READ, read_flags, token.0), + ]; + + // Since there is no way to check with which interests the fd was + // registered we modify both readable and write, adding it when required + // and removing it otherwise, ignoring the ENOENT error when it comes + // up. The ENOENT error informs us that a filter we're trying to remove + // wasn't there in first place, but we don't really care since our goal + // is accomplished. + // + // For the explanation of ignoring `EPIPE` see `register`. + kevent_register( + self.kq, + &mut changes, + &[libc::ENOENT as Data, libc::EPIPE as Data], + ) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + let flags = libc::EV_DELETE | libc::EV_RECEIPT; + let mut changes: [libc::kevent; 2] = [ + kevent!(fd, libc::EVFILT_WRITE, flags, 0), + kevent!(fd, libc::EVFILT_READ, flags, 0), + ]; + + // Since there is no way to check with which interests the fd was + // registered we remove both filters (readable and writeable) and ignore + // the ENOENT error when it comes up. The ENOENT error informs us that + // the filter wasn't there in first place, but we don't really care + // about that since our goal is to remove it. + kevent_register(self.kq, &mut changes, &[libc::ENOENT as Data]) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + + // Used by `Waker`. + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + pub fn setup_waker(&self, token: Token) -> io::Result<()> { + // First attempt to accept user space notifications. + let mut kevent = kevent!( + 0, + libc::EVFILT_USER, + libc::EV_ADD | libc::EV_CLEAR | libc::EV_RECEIPT, + token.0 + ); + + syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| { + if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 { + Err(io::Error::from_raw_os_error(kevent.data as i32)) + } else { + Ok(()) + } + }) + } + + // Used by `Waker`. + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + pub fn wake(&self, token: Token) -> io::Result<()> { + let mut kevent = kevent!( + 0, + libc::EVFILT_USER, + libc::EV_ADD | libc::EV_RECEIPT, + token.0 + ); + kevent.fflags = libc::NOTE_TRIGGER; + + syscall!(kevent(self.kq, &kevent, 1, &mut kevent, 1, ptr::null())).and_then(|_| { + if (kevent.flags & libc::EV_ERROR) != 0 && kevent.data != 0 { + Err(io::Error::from_raw_os_error(kevent.data as i32)) + } else { + Ok(()) + } + }) + } +} + +/// Register `changes` with `kq`ueue. +fn kevent_register( + kq: RawFd, + changes: &mut [libc::kevent], + ignored_errors: &[Data], +) -> io::Result<()> { + syscall!(kevent( + kq, + changes.as_ptr(), + changes.len() as Count, + changes.as_mut_ptr(), + changes.len() as Count, + ptr::null(), + )) + .map(|_| ()) + .or_else(|err| { + // According to the manual page of FreeBSD: "When kevent() call fails + // with EINTR error, all changes in the changelist have been applied", + // so we can safely ignore it. + if err.raw_os_error() == Some(libc::EINTR) { + Ok(()) + } else { + Err(err) + } + }) + .and_then(|()| check_errors(changes, ignored_errors)) +} + +/// Check all events for possible errors, it returns the first error found. +fn check_errors(events: &[libc::kevent], ignored_errors: &[Data]) -> io::Result<()> { + for event in events { + // We can't use references to packed structures (in checking the ignored + // errors), so we need copy the data out before use. + let data = event.data; + // Check for the error flag, the actual error will be in the `data` + // field. + if (event.flags & libc::EV_ERROR != 0) && data != 0 && !ignored_errors.contains(&data) { + return Err(io::Error::from_raw_os_error(data as i32)); + } + } + Ok(()) +} + +cfg_io_source! { + #[cfg(debug_assertions)] + impl Selector { + pub fn id(&self) -> usize { + self.id + } + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + self.kq + } +} + +impl Drop for Selector { + fn drop(&mut self) { + if let Err(err) = syscall!(close(self.kq)) { + error!("error closing kqueue: {}", err); + } + } +} + +pub type Event = libc::kevent; +pub struct Events(Vec<libc::kevent>); + +impl Events { + pub fn with_capacity(capacity: usize) -> Events { + Events(Vec::with_capacity(capacity)) + } +} + +impl Deref for Events { + type Target = Vec<libc::kevent>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Events { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +// `Events` cannot derive `Send` or `Sync` because of the +// `udata: *mut ::c_void` field in `libc::kevent`. However, `Events`'s public +// API treats the `udata` field as a `uintptr_t` which is `Send`. `Sync` is +// safe because with a `events: &Events` value, the only access to the `udata` +// field is through `fn token(event: &Event)` which cannot mutate the field. +unsafe impl Send for Events {} +unsafe impl Sync for Events {} + +pub mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + use super::{Filter, Flags}; + + pub fn token(event: &Event) -> Token { + Token(event.udata as usize) + } + + pub fn is_readable(event: &Event) -> bool { + event.filter == libc::EVFILT_READ || { + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + // Used by the `Awakener`. On platforms that use `eventfd` or a unix + // pipe it will emit a readable event so we'll fake that here as + // well. + { + event.filter == libc::EVFILT_USER + } + #[cfg(not(any(target_os = "freebsd", target_os = "ios", target_os = "macos")))] + { + false + } + } + } + + pub fn is_writable(event: &Event) -> bool { + event.filter == libc::EVFILT_WRITE + } + + pub fn is_error(event: &Event) -> bool { + (event.flags & libc::EV_ERROR) != 0 || + // When the read end of the socket is closed, EV_EOF is set on + // flags, and fflags contains the error if there is one. + (event.flags & libc::EV_EOF) != 0 && event.fflags != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + event.filter == libc::EVFILT_READ && event.flags & libc::EV_EOF != 0 + } + + pub fn is_write_closed(event: &Event) -> bool { + event.filter == libc::EVFILT_WRITE && event.flags & libc::EV_EOF != 0 + } + + pub fn is_priority(_: &Event) -> bool { + // kqueue doesn't have priority indicators. + false + } + + #[allow(unused_variables)] // `event` is not used on some platforms. + pub fn is_aio(event: &Event) -> bool { + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + { + event.filter == libc::EVFILT_AIO + } + #[cfg(not(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + )))] + { + false + } + } + + #[allow(unused_variables)] // `event` is only used on FreeBSD. + pub fn is_lio(event: &Event) -> bool { + #[cfg(target_os = "freebsd")] + { + event.filter == libc::EVFILT_LIO + } + #[cfg(not(target_os = "freebsd"))] + { + false + } + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + debug_detail!( + FilterDetails(Filter), + PartialEq::eq, + libc::EVFILT_READ, + libc::EVFILT_WRITE, + libc::EVFILT_AIO, + libc::EVFILT_VNODE, + libc::EVFILT_PROC, + libc::EVFILT_SIGNAL, + libc::EVFILT_TIMER, + #[cfg(target_os = "freebsd")] + libc::EVFILT_PROCDESC, + #[cfg(any( + target_os = "freebsd", + target_os = "dragonfly", + target_os = "ios", + target_os = "macos" + ))] + libc::EVFILT_FS, + #[cfg(target_os = "freebsd")] + libc::EVFILT_LIO, + #[cfg(any( + target_os = "freebsd", + target_os = "dragonfly", + target_os = "ios", + target_os = "macos" + ))] + libc::EVFILT_USER, + #[cfg(target_os = "freebsd")] + libc::EVFILT_SENDFILE, + #[cfg(target_os = "freebsd")] + libc::EVFILT_EMPTY, + #[cfg(target_os = "dragonfly")] + libc::EVFILT_EXCEPT, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::EVFILT_MACHPORT, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::EVFILT_VM, + ); + + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_flag(got: &Flags, want: &Flags) -> bool { + (got & want) != 0 + } + debug_detail!( + FlagsDetails(Flags), + check_flag, + libc::EV_ADD, + libc::EV_DELETE, + libc::EV_ENABLE, + libc::EV_DISABLE, + libc::EV_ONESHOT, + libc::EV_CLEAR, + libc::EV_RECEIPT, + libc::EV_DISPATCH, + #[cfg(target_os = "freebsd")] + libc::EV_DROP, + libc::EV_FLAG1, + libc::EV_ERROR, + libc::EV_EOF, + libc::EV_SYSFLAGS, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::EV_FLAG0, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::EV_POLL, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::EV_OOBAND, + #[cfg(target_os = "dragonfly")] + libc::EV_NODATA, + ); + + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_fflag(got: &u32, want: &u32) -> bool { + (got & want) != 0 + } + debug_detail!( + FflagsDetails(u32), + check_fflag, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_TRIGGER, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFNOP, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFAND, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFOR, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFCOPY, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFCTRLMASK, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos" + ))] + libc::NOTE_FFLAGSMASK, + libc::NOTE_LOWAT, + libc::NOTE_DELETE, + libc::NOTE_WRITE, + #[cfg(target_os = "dragonfly")] + libc::NOTE_OOB, + #[cfg(target_os = "openbsd")] + libc::NOTE_EOF, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXTEND, + libc::NOTE_ATTRIB, + libc::NOTE_LINK, + libc::NOTE_RENAME, + libc::NOTE_REVOKE, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_NONE, + #[cfg(any(target_os = "openbsd"))] + libc::NOTE_TRUNCATE, + libc::NOTE_EXIT, + libc::NOTE_FORK, + libc::NOTE_EXEC, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_SIGNAL, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXITSTATUS, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXIT_DETAIL, + libc::NOTE_PDATAMASK, + libc::NOTE_PCTRLMASK, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd" + ))] + libc::NOTE_TRACK, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd" + ))] + libc::NOTE_TRACKERR, + #[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + target_os = "openbsd" + ))] + libc::NOTE_CHILD, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXIT_DETAIL_MASK, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXIT_DECRYPTFAIL, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXIT_MEMORY, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_EXIT_CSERROR, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_VM_PRESSURE, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_VM_PRESSURE_TERMINATE, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_VM_PRESSURE_SUDDEN_TERMINATE, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_VM_ERROR, + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + libc::NOTE_SECONDS, + #[cfg(any(target_os = "freebsd"))] + libc::NOTE_MSECONDS, + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + libc::NOTE_USECONDS, + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + libc::NOTE_NSECONDS, + #[cfg(any(target_os = "ios", target_os = "macos"))] + #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] + libc::NOTE_ABSOLUTE, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_LEEWAY, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_CRITICAL, + #[cfg(any(target_os = "ios", target_os = "macos"))] + libc::NOTE_BACKGROUND, + ); + + // Can't reference fields in packed structures. + let ident = event.ident; + let data = event.data; + let udata = event.udata; + f.debug_struct("kevent") + .field("ident", &ident) + .field("filter", &FilterDetails(event.filter)) + .field("flags", &FlagsDetails(event.flags)) + .field("fflags", &FflagsDetails(event.fflags)) + .field("data", &data) + .field("udata", &udata) + .finish() + } +} + +#[test] +#[cfg(feature = "os-ext")] +fn does_not_register_rw() { + use crate::unix::SourceFd; + use crate::{Poll, Token}; + + let kq = unsafe { libc::kqueue() }; + let mut kqf = SourceFd(&kq); + let poll = Poll::new().unwrap(); + + // Registering kqueue fd will fail if write is requested (On anything but + // some versions of macOS). + poll.registry() + .register(&mut kqf, Token(1234), Interest::READABLE) + .unwrap(); +} diff --git a/third_party/rust/mio/src/sys/unix/selector/mod.rs b/third_party/rust/mio/src/sys/unix/selector/mod.rs new file mode 100644 index 0000000000..da61e14d7e --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/selector/mod.rs @@ -0,0 +1,35 @@ +#[cfg(any(target_os = "android", target_os = "illumos", target_os = "linux"))] +mod epoll; + +#[cfg(any(target_os = "android", target_os = "illumos", target_os = "linux"))] +pub(crate) use self::epoll::{event, Event, Events, Selector}; + +#[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" +))] +mod kqueue; + +#[cfg(any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd" +))] +pub(crate) use self::kqueue::{event, Event, Events, Selector}; + +/// Lowest file descriptor used in `Selector::try_clone`. +/// +/// # Notes +/// +/// Usually fds 0, 1 and 2 are standard in, out and error. Some application +/// blindly assume this to be true, which means using any one of those a select +/// could result in some interesting and unexpected errors. Avoid that by using +/// an fd that doesn't have a pre-determined usage. +const LOWEST_FD: libc::c_int = 3; diff --git a/third_party/rust/mio/src/sys/unix/sourcefd.rs b/third_party/rust/mio/src/sys/unix/sourcefd.rs new file mode 100644 index 0000000000..84e776d21d --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/sourcefd.rs @@ -0,0 +1,116 @@ +use crate::{event, Interest, Registry, Token}; + +use std::io; +use std::os::unix::io::RawFd; + +/// Adapter for [`RawFd`] providing an [`event::Source`] implementation. +/// +/// `SourceFd` enables registering any type with an FD with [`Poll`]. +/// +/// While only implementations for TCP and UDP are provided, Mio supports +/// registering any FD that can be registered with the underlying OS selector. +/// `SourceFd` provides the necessary bridge. +/// +/// Note that `SourceFd` takes a `&RawFd`. This is because `SourceFd` **does +/// not** take ownership of the FD. Specifically, it will not manage any +/// lifecycle related operations, such as closing the FD on drop. It is expected +/// that the `SourceFd` is constructed right before a call to +/// [`Registry::register`]. See the examples for more detail. +/// +/// [`event::Source`]: ../event/trait.Source.html +/// [`Poll`]: ../struct.Poll.html +/// [`Registry::register`]: ../struct.Registry.html#method.register +/// +/// # Examples +/// +/// Basic usage. +/// +#[cfg_attr( + all(feature = "os-poll", feature = "net", feature = "os-ext"), + doc = "```" +)] +#[cfg_attr( + not(all(feature = "os-poll", feature = "net", feature = "os-ext")), + doc = "```ignore" +)] +/// # use std::error::Error; +/// # fn main() -> Result<(), Box<dyn Error>> { +/// use mio::{Interest, Poll, Token}; +/// use mio::unix::SourceFd; +/// +/// use std::os::unix::io::AsRawFd; +/// use std::net::TcpListener; +/// +/// // Bind a std listener +/// let listener = TcpListener::bind("127.0.0.1:0")?; +/// +/// let poll = Poll::new()?; +/// +/// // Register the listener +/// poll.registry().register( +/// &mut SourceFd(&listener.as_raw_fd()), +/// Token(0), +/// Interest::READABLE)?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Implementing [`event::Source`] for a custom type backed by a [`RawFd`]. +/// +#[cfg_attr(all(feature = "os-poll", feature = "os-ext"), doc = "```")] +#[cfg_attr(not(all(feature = "os-poll", feature = "os-ext")), doc = "```ignore")] +/// use mio::{event, Interest, Registry, Token}; +/// use mio::unix::SourceFd; +/// +/// use std::os::unix::io::RawFd; +/// use std::io; +/// +/// # #[allow(dead_code)] +/// pub struct MyIo { +/// fd: RawFd, +/// } +/// +/// impl event::Source for MyIo { +/// fn register(&mut self, registry: &Registry, token: Token, interests: Interest) +/// -> io::Result<()> +/// { +/// SourceFd(&self.fd).register(registry, token, interests) +/// } +/// +/// fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest) +/// -> io::Result<()> +/// { +/// SourceFd(&self.fd).reregister(registry, token, interests) +/// } +/// +/// fn deregister(&mut self, registry: &Registry) -> io::Result<()> { +/// SourceFd(&self.fd).deregister(registry) +/// } +/// } +/// ``` +#[derive(Debug)] +pub struct SourceFd<'a>(pub &'a RawFd); + +impl<'a> event::Source for SourceFd<'a> { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + registry.selector().register(*self.0, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + registry.selector().reregister(*self.0, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + registry.selector().deregister(*self.0) + } +} diff --git a/third_party/rust/mio/src/sys/unix/tcp.rs b/third_party/rust/mio/src/sys/unix/tcp.rs new file mode 100644 index 0000000000..5b02cfcb52 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/tcp.rs @@ -0,0 +1,113 @@ +use std::convert::TryInto; +use std::io; +use std::mem::{size_of, MaybeUninit}; +use std::net::{self, SocketAddr}; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +use crate::sys::unix::net::{new_socket, socket_addr, to_socket_addr}; + +pub(crate) fn new_for_addr(address: SocketAddr) -> io::Result<libc::c_int> { + let domain = match address { + SocketAddr::V4(_) => libc::AF_INET, + SocketAddr::V6(_) => libc::AF_INET6, + }; + new_socket(domain, libc::SOCK_STREAM) +} + +pub(crate) fn bind(socket: &net::TcpListener, addr: SocketAddr) -> io::Result<()> { + let (raw_addr, raw_addr_length) = socket_addr(&addr); + syscall!(bind(socket.as_raw_fd(), raw_addr.as_ptr(), raw_addr_length))?; + Ok(()) +} + +pub(crate) fn connect(socket: &net::TcpStream, addr: SocketAddr) -> io::Result<()> { + let (raw_addr, raw_addr_length) = socket_addr(&addr); + + match syscall!(connect( + socket.as_raw_fd(), + raw_addr.as_ptr(), + raw_addr_length + )) { + Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), + _ => Ok(()), + } +} + +pub(crate) fn listen(socket: &net::TcpListener, backlog: u32) -> io::Result<()> { + let backlog = backlog.try_into().unwrap_or(i32::max_value()); + syscall!(listen(socket.as_raw_fd(), backlog))?; + Ok(()) +} + +pub(crate) fn set_reuseaddr(socket: &net::TcpListener, reuseaddr: bool) -> io::Result<()> { + let val: libc::c_int = if reuseaddr { 1 } else { 0 }; + syscall!(setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_REUSEADDR, + &val as *const libc::c_int as *const libc::c_void, + size_of::<libc::c_int>() as libc::socklen_t, + ))?; + Ok(()) +} + +pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, SocketAddr)> { + let mut addr: MaybeUninit<libc::sockaddr_storage> = MaybeUninit::uninit(); + let mut length = size_of::<libc::sockaddr_storage>() as libc::socklen_t; + + // On platforms that support it we can use `accept4(2)` to set `NONBLOCK` + // and `CLOEXEC` in the call to accept the connection. + #[cfg(any( + // Android x86's seccomp profile forbids calls to `accept4(2)` + // See https://github.com/tokio-rs/mio/issues/1445 for details + all( + not(target_arch="x86"), + target_os = "android" + ), + target_os = "dragonfly", + target_os = "freebsd", + target_os = "illumos", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd" + ))] + let stream = { + syscall!(accept4( + listener.as_raw_fd(), + addr.as_mut_ptr() as *mut _, + &mut length, + libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, + )) + .map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) }) + }?; + + // But not all platforms have the `accept4(2)` call. Luckily BSD (derived) + // OSes inherit the non-blocking flag from the listener, so we just have to + // set `CLOEXEC`. + #[cfg(any( + all(target_arch = "x86", target_os = "android"), + target_os = "ios", + target_os = "macos", + ))] + let stream = { + syscall!(accept( + listener.as_raw_fd(), + addr.as_mut_ptr() as *mut _, + &mut length + )) + .map(|socket| unsafe { net::TcpStream::from_raw_fd(socket) }) + .and_then(|s| { + syscall!(fcntl(s.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; + + // See https://github.com/tokio-rs/mio/issues/1450 + #[cfg(all(target_arch = "x86", target_os = "android"))] + syscall!(fcntl(s.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?; + + Ok(s) + }) + }?; + + // This is safe because `accept` calls above ensures the address + // initialised. + unsafe { to_socket_addr(addr.as_ptr()) }.map(|addr| (stream, addr)) +} diff --git a/third_party/rust/mio/src/sys/unix/udp.rs b/third_party/rust/mio/src/sys/unix/udp.rs new file mode 100644 index 0000000000..5a97cbd897 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/udp.rs @@ -0,0 +1,39 @@ +use crate::sys::unix::net::{new_ip_socket, socket_addr}; + +use std::io; +use std::mem; +use std::net::{self, SocketAddr}; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +pub fn bind(addr: SocketAddr) -> io::Result<net::UdpSocket> { + // Gives a warning for non Apple platforms. + #[allow(clippy::let_and_return)] + let socket = new_ip_socket(addr, libc::SOCK_DGRAM); + + socket.and_then(|socket| { + let (raw_addr, raw_addr_length) = socket_addr(&addr); + syscall!(bind(socket, raw_addr.as_ptr(), raw_addr_length)) + .map_err(|err| { + // Close the socket if we hit an error, ignoring the error + // from closing since we can't pass back two errors. + let _ = unsafe { libc::close(socket) }; + err + }) + .map(|_| unsafe { net::UdpSocket::from_raw_fd(socket) }) + }) +} + +pub(crate) fn only_v6(socket: &net::UdpSocket) -> io::Result<bool> { + let mut optval: libc::c_int = 0; + let mut optlen = mem::size_of::<libc::c_int>() as libc::socklen_t; + + syscall!(getsockopt( + socket.as_raw_fd(), + libc::IPPROTO_IPV6, + libc::IPV6_V6ONLY, + &mut optval as *mut _ as *mut _, + &mut optlen, + ))?; + + Ok(optval != 0) +} diff --git a/third_party/rust/mio/src/sys/unix/uds/datagram.rs b/third_party/rust/mio/src/sys/unix/uds/datagram.rs new file mode 100644 index 0000000000..d3e5314fe3 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds/datagram.rs @@ -0,0 +1,56 @@ +use super::{socket_addr, SocketAddr}; +use crate::sys::unix::net::new_socket; + +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::net; +use std::path::Path; + +pub(crate) fn bind(path: &Path) -> io::Result<net::UnixDatagram> { + let fd = new_socket(libc::AF_UNIX, libc::SOCK_DGRAM)?; + // Ensure the fd is closed. + let socket = unsafe { net::UnixDatagram::from_raw_fd(fd) }; + let (sockaddr, socklen) = socket_addr(path)?; + let sockaddr = &sockaddr as *const libc::sockaddr_un as *const _; + syscall!(bind(fd, sockaddr, socklen))?; + Ok(socket) +} + +pub(crate) fn unbound() -> io::Result<net::UnixDatagram> { + new_socket(libc::AF_UNIX, libc::SOCK_DGRAM) + .map(|socket| unsafe { net::UnixDatagram::from_raw_fd(socket) }) +} + +pub(crate) fn pair() -> io::Result<(net::UnixDatagram, net::UnixDatagram)> { + super::pair(libc::SOCK_DGRAM) +} + +pub(crate) fn local_addr(socket: &net::UnixDatagram) -> io::Result<SocketAddr> { + super::local_addr(socket.as_raw_fd()) +} + +pub(crate) fn peer_addr(socket: &net::UnixDatagram) -> io::Result<SocketAddr> { + super::peer_addr(socket.as_raw_fd()) +} + +pub(crate) fn recv_from( + socket: &net::UnixDatagram, + dst: &mut [u8], +) -> io::Result<(usize, SocketAddr)> { + let mut count = 0; + let socketaddr = SocketAddr::new(|sockaddr, socklen| { + syscall!(recvfrom( + socket.as_raw_fd(), + dst.as_mut_ptr() as *mut _, + dst.len(), + 0, + sockaddr, + socklen, + )) + .map(|c| { + count = c; + c as libc::c_int + }) + })?; + Ok((count as usize, socketaddr)) +} diff --git a/third_party/rust/mio/src/sys/unix/uds/listener.rs b/third_party/rust/mio/src/sys/unix/uds/listener.rs new file mode 100644 index 0000000000..b6218427f2 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds/listener.rs @@ -0,0 +1,94 @@ +use super::socket_addr; +use crate::net::{SocketAddr, UnixStream}; +use crate::sys::unix::net::new_socket; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::net; +use std::path::Path; +use std::{io, mem}; + +pub(crate) fn bind(path: &Path) -> io::Result<net::UnixListener> { + let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?; + let (sockaddr, socklen) = socket_addr(path)?; + let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr; + + syscall!(bind(socket, sockaddr, socklen)) + .and_then(|_| syscall!(listen(socket, 1024))) + .map_err(|err| { + // Close the socket if we hit an error, ignoring the error from + // closing since we can't pass back two errors. + let _ = unsafe { libc::close(socket) }; + err + }) + .map(|_| unsafe { net::UnixListener::from_raw_fd(socket) }) +} + +pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, SocketAddr)> { + let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed(); + + // This is safe to assume because a `libc::sockaddr_un` filled with `0` + // bytes is properly initialized. + // + // `0` is a valid value for `sockaddr_un::sun_family`; it is + // `libc::AF_UNSPEC`. + // + // `[0; 108]` is a valid value for `sockaddr_un::sun_path`; it begins an + // abstract path. + let mut sockaddr = unsafe { sockaddr.assume_init() }; + + sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t; + let mut socklen = mem::size_of_val(&sockaddr) as libc::socklen_t; + + #[cfg(not(any( + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + // Android x86's seccomp profile forbids calls to `accept4(2)` + // See https://github.com/tokio-rs/mio/issues/1445 for details + all( + target_arch = "x86", + target_os = "android" + ) + )))] + let socket = { + let flags = libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; + syscall!(accept4( + listener.as_raw_fd(), + &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr, + &mut socklen, + flags + )) + .map(|socket| unsafe { net::UnixStream::from_raw_fd(socket) }) + }; + + #[cfg(any( + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + all(target_arch = "x86", target_os = "android") + ))] + let socket = syscall!(accept( + listener.as_raw_fd(), + &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr, + &mut socklen, + )) + .and_then(|socket| { + // Ensure the socket is closed if either of the `fcntl` calls + // error below. + let s = unsafe { net::UnixStream::from_raw_fd(socket) }; + syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC))?; + + // See https://github.com/tokio-rs/mio/issues/1450 + #[cfg(all(target_arch = "x86", target_os = "android"))] + syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK))?; + + Ok(s) + }); + + socket + .map(UnixStream::from_std) + .map(|stream| (stream, SocketAddr::from_parts(sockaddr, socklen))) +} + +pub(crate) fn local_addr(listener: &net::UnixListener) -> io::Result<SocketAddr> { + super::local_addr(listener.as_raw_fd()) +} diff --git a/third_party/rust/mio/src/sys/unix/uds/mod.rs b/third_party/rust/mio/src/sys/unix/uds/mod.rs new file mode 100644 index 0000000000..8e28a9573a --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds/mod.rs @@ -0,0 +1,149 @@ +mod socketaddr; +pub use self::socketaddr::SocketAddr; + +/// Get the `sun_path` field offset of `sockaddr_un` for the target OS. +/// +/// On Linux, this funtion equates to the same value as +/// `size_of::<sa_family_t>()`, but some other implementations include +/// other fields before `sun_path`, so the expression more portably +/// describes the size of the address structure. +pub(in crate::sys) fn path_offset(sockaddr: &libc::sockaddr_un) -> usize { + let base = sockaddr as *const _ as usize; + let path = &sockaddr.sun_path as *const _ as usize; + path - base +} + +cfg_os_poll! { + use std::cmp::Ordering; + use std::os::unix::ffi::OsStrExt; + use std::os::unix::io::{RawFd, FromRawFd}; + use std::path::Path; + use std::{io, mem}; + + pub(crate) mod datagram; + pub(crate) mod listener; + pub(crate) mod stream; + + pub(in crate::sys) fn socket_addr(path: &Path) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> { + let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed(); + + // This is safe to assume because a `libc::sockaddr_un` filled with `0` + // bytes is properly initialized. + // + // `0` is a valid value for `sockaddr_un::sun_family`; it is + // `libc::AF_UNSPEC`. + // + // `[0; 108]` is a valid value for `sockaddr_un::sun_path`; it begins an + // abstract path. + let mut sockaddr = unsafe { sockaddr.assume_init() }; + + sockaddr.sun_family = libc::AF_UNIX as libc::sa_family_t; + + let bytes = path.as_os_str().as_bytes(); + match (bytes.get(0), bytes.len().cmp(&sockaddr.sun_path.len())) { + // Abstract paths don't need a null terminator + (Some(&0), Ordering::Greater) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "path must be no longer than libc::sockaddr_un.sun_path", + )); + } + (_, Ordering::Greater) | (_, Ordering::Equal) => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "path must be shorter than libc::sockaddr_un.sun_path", + )); + } + _ => {} + } + + for (dst, src) in sockaddr.sun_path.iter_mut().zip(bytes.iter()) { + *dst = *src as libc::c_char; + } + + let offset = path_offset(&sockaddr); + let mut socklen = offset + bytes.len(); + + match bytes.get(0) { + // The struct has already been zeroes so the null byte for pathname + // addresses is already there. + Some(&0) | None => {} + Some(_) => socklen += 1, + } + + Ok((sockaddr, socklen as libc::socklen_t)) + } + + fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)> + where T: FromRawFd, + { + #[cfg(not(any(target_os = "ios", target_os = "macos")))] + let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; + + let mut fds = [-1; 2]; + syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; + let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; + + // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. + // + // In order to set those flags, additional `fcntl` sys calls must be + // performed. If a `fnctl` fails after the sockets have been created, + // the file descriptors will leak. Creating `pair` above ensures that if + // there is an error, the file descriptors are closed. + #[cfg(any(target_os = "ios", target_os = "macos"))] + { + syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?; + syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?; + syscall!(fcntl(fds[1], libc::F_SETFL, libc::O_NONBLOCK))?; + syscall!(fcntl(fds[1], libc::F_SETFD, libc::FD_CLOEXEC))?; + } + Ok(pair) + } + + // The following functions can't simply be replaced with a call to + // `net::UnixDatagram` because of our `SocketAddr` type. + + fn local_addr(socket: RawFd) -> io::Result<SocketAddr> { + SocketAddr::new(|sockaddr, socklen| syscall!(getsockname(socket, sockaddr, socklen))) + } + + fn peer_addr(socket: RawFd) -> io::Result<SocketAddr> { + SocketAddr::new(|sockaddr, socklen| syscall!(getpeername(socket, sockaddr, socklen))) + } + + #[cfg(test)] + mod tests { + use super::{path_offset, socket_addr}; + use std::path::Path; + use std::str; + + #[test] + fn pathname_address() { + const PATH: &str = "./foo/bar.txt"; + const PATH_LEN: usize = 13; + + // Pathname addresses do have a null terminator, so `socklen` is + // expected to be `PATH_LEN` + `offset` + 1. + let path = Path::new(PATH); + let (sockaddr, actual) = socket_addr(path).unwrap(); + let offset = path_offset(&sockaddr); + let expected = PATH_LEN + offset + 1; + assert_eq!(expected as libc::socklen_t, actual) + } + + #[test] + fn abstract_address() { + const PATH: &[u8] = &[0, 116, 111, 107, 105, 111]; + const PATH_LEN: usize = 6; + + // Abstract addresses do not have a null terminator, so `socklen` is + // expected to be `PATH_LEN` + `offset`. + let abstract_path = str::from_utf8(PATH).unwrap(); + let path = Path::new(abstract_path); + let (sockaddr, actual) = socket_addr(path).unwrap(); + let offset = path_offset(&sockaddr); + let expected = PATH_LEN + offset; + assert_eq!(expected as libc::socklen_t, actual) + } + } +} diff --git a/third_party/rust/mio/src/sys/unix/uds/socketaddr.rs b/third_party/rust/mio/src/sys/unix/uds/socketaddr.rs new file mode 100644 index 0000000000..4c7c411618 --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds/socketaddr.rs @@ -0,0 +1,130 @@ +use super::path_offset; +use std::ffi::OsStr; +use std::os::unix::ffi::OsStrExt; +use std::path::Path; +use std::{ascii, fmt}; + +/// An address associated with a `mio` specific Unix socket. +/// +/// This is implemented instead of imported from [`net::SocketAddr`] because +/// there is no way to create a [`net::SocketAddr`]. One must be returned by +/// [`accept`], so this is returned instead. +/// +/// [`net::SocketAddr`]: std::os::unix::net::SocketAddr +/// [`accept`]: #method.accept +pub struct SocketAddr { + sockaddr: libc::sockaddr_un, + socklen: libc::socklen_t, +} + +struct AsciiEscaped<'a>(&'a [u8]); + +enum AddressKind<'a> { + Unnamed, + Pathname(&'a Path), + Abstract(&'a [u8]), +} + +impl SocketAddr { + fn address(&self) -> AddressKind<'_> { + let offset = path_offset(&self.sockaddr); + // Don't underflow in `len` below. + if (self.socklen as usize) < offset { + return AddressKind::Unnamed; + } + let len = self.socklen as usize - offset; + let path = unsafe { &*(&self.sockaddr.sun_path as *const [libc::c_char] as *const [u8]) }; + + // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses + if len == 0 + || (cfg!(not(any(target_os = "linux", target_os = "android"))) + && self.sockaddr.sun_path[0] == 0) + { + AddressKind::Unnamed + } else if self.sockaddr.sun_path[0] == 0 { + AddressKind::Abstract(&path[1..len]) + } else { + AddressKind::Pathname(OsStr::from_bytes(&path[..len - 1]).as_ref()) + } + } +} + +cfg_os_poll! { + use std::{io, mem}; + + impl SocketAddr { + pub(crate) fn new<F>(f: F) -> io::Result<SocketAddr> + where + F: FnOnce(*mut libc::sockaddr, &mut libc::socklen_t) -> io::Result<libc::c_int>, + { + let mut sockaddr = { + let sockaddr = mem::MaybeUninit::<libc::sockaddr_un>::zeroed(); + unsafe { sockaddr.assume_init() } + }; + + let raw_sockaddr = &mut sockaddr as *mut libc::sockaddr_un as *mut libc::sockaddr; + let mut socklen = mem::size_of_val(&sockaddr) as libc::socklen_t; + + f(raw_sockaddr, &mut socklen)?; + Ok(SocketAddr::from_parts(sockaddr, socklen)) + } + + pub(crate) fn from_parts(sockaddr: libc::sockaddr_un, socklen: libc::socklen_t) -> SocketAddr { + SocketAddr { sockaddr, socklen } + } + + /// Returns `true` if the address is unnamed. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn is_unnamed(&self) -> bool { + matches!(self.address(), AddressKind::Unnamed) + } + + /// Returns the contents of this address if it is a `pathname` address. + /// + /// Documentation reflected in [`SocketAddr`] + /// + /// [`SocketAddr`]: std::os::unix::net::SocketAddr + pub fn as_pathname(&self) -> Option<&Path> { + if let AddressKind::Pathname(path) = self.address() { + Some(path) + } else { + None + } + } + + /// Returns the contents of this address if it is an abstract namespace + /// without the leading null byte. + // Link to std::os::unix::net::SocketAddr pending + // https://github.com/rust-lang/rust/issues/85410. + pub fn as_abstract_namespace(&self) -> Option<&[u8]> { + if let AddressKind::Abstract(path) = self.address() { + Some(path) + } else { + None + } + } + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.address() { + AddressKind::Unnamed => write!(fmt, "(unnamed)"), + AddressKind::Abstract(name) => write!(fmt, "{} (abstract)", AsciiEscaped(name)), + AddressKind::Pathname(path) => write!(fmt, "{:?} (pathname)", path), + } + } +} + +impl<'a> fmt::Display for AsciiEscaped<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "\"")?; + for byte in self.0.iter().cloned().flat_map(ascii::escape_default) { + write!(fmt, "{}", byte as char)?; + } + write!(fmt, "\"") + } +} diff --git a/third_party/rust/mio/src/sys/unix/uds/stream.rs b/third_party/rust/mio/src/sys/unix/uds/stream.rs new file mode 100644 index 0000000000..149dd14e1d --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/uds/stream.rs @@ -0,0 +1,39 @@ +use super::{socket_addr, SocketAddr}; +use crate::sys::unix::net::new_socket; + +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd}; +use std::os::unix::net; +use std::path::Path; + +pub(crate) fn connect(path: &Path) -> io::Result<net::UnixStream> { + let socket = new_socket(libc::AF_UNIX, libc::SOCK_STREAM)?; + let (sockaddr, socklen) = socket_addr(path)?; + let sockaddr = &sockaddr as *const libc::sockaddr_un as *const libc::sockaddr; + + match syscall!(connect(socket, sockaddr, socklen)) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} + Err(e) => { + // Close the socket if we hit an error, ignoring the error + // from closing since we can't pass back two errors. + let _ = unsafe { libc::close(socket) }; + + return Err(e); + } + } + + Ok(unsafe { net::UnixStream::from_raw_fd(socket) }) +} + +pub(crate) fn pair() -> io::Result<(net::UnixStream, net::UnixStream)> { + super::pair(libc::SOCK_STREAM) +} + +pub(crate) fn local_addr(socket: &net::UnixStream) -> io::Result<SocketAddr> { + super::local_addr(socket.as_raw_fd()) +} + +pub(crate) fn peer_addr(socket: &net::UnixStream) -> io::Result<SocketAddr> { + super::peer_addr(socket.as_raw_fd()) +} diff --git a/third_party/rust/mio/src/sys/unix/waker.rs b/third_party/rust/mio/src/sys/unix/waker.rs new file mode 100644 index 0000000000..684fee981e --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/waker.rs @@ -0,0 +1,178 @@ +#[cfg(any(target_os = "linux", target_os = "android"))] +mod eventfd { + use crate::sys::Selector; + use crate::{Interest, Token}; + + use std::fs::File; + use std::io::{self, Read, Write}; + use std::os::unix::io::FromRawFd; + + /// Waker backed by `eventfd`. + /// + /// `eventfd` is effectively an 64 bit counter. All writes must be of 8 + /// bytes (64 bits) and are converted (native endian) into an 64 bit + /// unsigned integer and added to the count. Reads must also be 8 bytes and + /// reset the count to 0, returning the count. + #[derive(Debug)] + pub struct Waker { + fd: File, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { + syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)).and_then(|fd| { + // Turn the file descriptor into a file first so we're ensured + // it's closed when dropped, e.g. when register below fails. + let file = unsafe { File::from_raw_fd(fd) }; + selector + .register(fd, token, Interest::READABLE) + .map(|()| Waker { fd: file }) + }) + } + + pub fn wake(&self) -> io::Result<()> { + let buf: [u8; 8] = 1u64.to_ne_bytes(); + match (&self.fd).write(&buf) { + Ok(_) => Ok(()), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + // Writing only blocks if the counter is going to overflow. + // So we'll reset the counter to 0 and wake it again. + self.reset()?; + self.wake() + } + Err(err) => Err(err), + } + } + + /// Reset the eventfd object, only need to call this if `wake` fails. + fn reset(&self) -> io::Result<()> { + let mut buf: [u8; 8] = 0u64.to_ne_bytes(); + match (&self.fd).read(&mut buf) { + Ok(_) => Ok(()), + // If the `Waker` hasn't been awoken yet this will return a + // `WouldBlock` error which we can safely ignore. + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()), + Err(err) => Err(err), + } + } + } +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use self::eventfd::Waker; + +#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] +mod kqueue { + use crate::sys::Selector; + use crate::Token; + + use std::io; + + /// Waker backed by kqueue user space notifications (`EVFILT_USER`). + /// + /// The implementation is fairly simple, first the kqueue must be setup to + /// receive waker events this done by calling `Selector.setup_waker`. Next + /// we need access to kqueue, thus we need to duplicate the file descriptor. + /// Now waking is as simple as adding an event to the kqueue. + #[derive(Debug)] + pub struct Waker { + selector: Selector, + token: Token, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { + selector.try_clone().and_then(|selector| { + selector + .setup_waker(token) + .map(|()| Waker { selector, token }) + }) + } + + pub fn wake(&self) -> io::Result<()> { + self.selector.wake(self.token) + } + } +} + +#[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] +pub use self::kqueue::Waker; + +#[cfg(any( + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", +))] +mod pipe { + use crate::sys::unix::Selector; + use crate::{Interest, Token}; + + use std::fs::File; + use std::io::{self, Read, Write}; + use std::os::unix::io::FromRawFd; + + /// Waker backed by a unix pipe. + /// + /// Waker controls both the sending and receiving ends and empties the pipe + /// if writing to it (waking) fails. + #[derive(Debug)] + pub struct Waker { + sender: File, + receiver: File, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { + let mut fds = [-1; 2]; + syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + // Turn the file descriptors into files first so we're ensured + // they're closed when dropped, e.g. when register below fails. + let sender = unsafe { File::from_raw_fd(fds[1]) }; + let receiver = unsafe { File::from_raw_fd(fds[0]) }; + selector + .register(fds[0], token, Interest::READABLE) + .map(|()| Waker { sender, receiver }) + } + + pub fn wake(&self) -> io::Result<()> { + // The epoll emulation on some illumos systems currently requires + // the pipe buffer to be completely empty for an edge-triggered + // wakeup on the pipe read side. + #[cfg(target_os = "illumos")] + self.empty(); + + match (&self.sender).write(&[1]) { + Ok(_) => Ok(()), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + // The reading end is full so we'll empty the buffer and try + // again. + self.empty(); + self.wake() + } + Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), + Err(err) => Err(err), + } + } + + /// Empty the pipe's buffer, only need to call this if `wake` fails. + /// This ignores any errors. + fn empty(&self) { + let mut buf = [0; 4096]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(n) if n > 0 => continue, + _ => return, + } + } + } + } +} + +#[cfg(any( + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", +))] +pub use self::pipe::Waker; |