diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/net/unix | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/net/unix')
-rw-r--r-- | vendor/tokio/src/net/unix/datagram/socket.rs | 321 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/listener.rs | 67 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/mod.rs | 17 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/pipe.rs | 1222 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/split.rs | 233 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/split_owned.rs | 236 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/stream.rs | 188 | ||||
-rw-r--r-- | vendor/tokio/src/net/unix/ucred.rs | 157 |
8 files changed, 2332 insertions, 109 deletions
diff --git a/vendor/tokio/src/net/unix/datagram/socket.rs b/vendor/tokio/src/net/unix/datagram/socket.rs index 2d2177803..c97d101aa 100644 --- a/vendor/tokio/src/net/unix/datagram/socket.rs +++ b/vendor/tokio/src/net/unix/datagram/socket.rs @@ -1,10 +1,11 @@ use crate::io::{Interest, PollEvented, ReadBuf, Ready}; use crate::net::unix::SocketAddr; -use std::convert::TryFrom; use std::fmt; use std::io; use std::net::Shutdown; +#[cfg(not(tokio_no_as_fd))] +use std::os::unix::io::{AsFd, BorrowedFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use std::path::Path; @@ -90,13 +91,14 @@ cfg_net_unix! { /// # Ok(()) /// # } /// ``` + #[cfg_attr(docsrs, doc(alias = "uds"))] pub struct UnixDatagram { io: PollEvented<mio::net::UnixDatagram>, } } impl UnixDatagram { - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_recv()` or `try_send()`. It /// can be used to concurrently recv / send to the same socket on a single @@ -104,7 +106,9 @@ impl UnixDatagram { /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. /// /// # Cancel safety /// @@ -169,7 +173,7 @@ impl UnixDatagram { Ok(event.ready) } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is /// usually paired with `try_send()` or `try_send_to()`. @@ -226,7 +230,40 @@ impl UnixDatagram { Ok(()) } - /// Wait for the socket to become readable. + /// Polls for write/send readiness. + /// + /// If the socket is not currently ready for sending, this method will + /// store a clone of the `Waker` from the provided `Context`. When the socket + /// becomes ready for sending, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a + /// second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the socket is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`writable`]: method@Self::writable + pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_recv()`. @@ -289,6 +326,39 @@ impl UnixDatagram { Ok(()) } + /// Polls for read/receive readiness. + /// + /// If the socket is not currently ready for receiving, this method will + /// store a clone of the `Waker` from the provided `Context`. When the + /// socket becomes ready for reading, `Waker::wake` will be called on the + /// waker. + /// + /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or + /// `poll_peek`, only the `Waker` from the `Context` passed to the most + /// recent call is scheduled to receive a wakeup. (However, + /// `poll_send_ready` retains a second, independent waker.) + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the socket is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the socket is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// [`readable`]: method@Self::readable + pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + /// Creates a new `UnixDatagram` bound to the specified path. /// /// # Examples @@ -358,13 +428,21 @@ impl UnixDatagram { /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`. /// /// This function is intended to be used to wrap a UnixDatagram from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying datagram; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the socker is in + /// non-blocking mode. Otherwise all I/O operations on the socket + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking /// /// # Panics /// - /// This function panics if thread-local runtime is not set. + /// This function panics if it is not called from within a runtime with + /// IO enabled. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a Tokio runtime, otherwise runtime can be set @@ -391,30 +469,29 @@ impl UnixDatagram { /// # Ok(()) /// # } /// ``` + #[track_caller] pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> { let socket = mio::net::UnixDatagram::from_std(datagram); let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } - /// Turn a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. + /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`]. /// /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking - /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode + /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode /// if needed. /// /// # Examples /// /// ```rust,no_run - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box<dyn Error>> { - /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?; - /// let std_socket = tokio_socket.into_std()?; - /// std_socket.set_nonblocking(false)?; - /// Ok(()) - /// } + /// # use std::error::Error; + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?; + /// let std_socket = tokio_socket.into_std()?; + /// std_socket.set_nonblocking(false)?; + /// # Ok(()) + /// # } /// ``` /// /// [`tokio::net::UnixDatagram`]: UnixDatagram @@ -548,7 +625,7 @@ impl UnixDatagram { .await } - /// Try to send a datagram to the peer without waiting. + /// Tries to send a datagram to the peer without waiting. /// /// # Examples /// @@ -592,7 +669,7 @@ impl UnixDatagram { .try_io(Interest::WRITABLE, || self.io.send(buf)) } - /// Try to send a datagram to the peer without waiting. + /// Tries to send a datagram to the peer without waiting. /// /// # Examples /// @@ -678,7 +755,7 @@ impl UnixDatagram { .await } - /// Try to receive a datagram from the peer without waiting. + /// Tries to receive a datagram from the peer without waiting. /// /// # Examples /// @@ -729,7 +806,9 @@ impl UnixDatagram { } cfg_io_util! { - /// Try to receive data from the socket without waiting. + /// Tries to receive data from the socket without waiting. + /// + /// This method can be used even if `buf` is uninitialized. /// /// # Examples /// @@ -778,7 +857,7 @@ impl UnixDatagram { // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the // buffer. - let (n, addr) = (&*self.io).recv_from(dst)?; + let (n, addr) = (*self.io).recv_from(dst)?; unsafe { buf.advance_mut(n); @@ -790,9 +869,64 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } - /// Try to read data from the stream into the provided buffer, advancing the + /// Receives from the socket, advances the + /// buffer's internal cursor and returns how many bytes were read and the origin. + /// + /// This method can be used even if `buf` is uninitialized. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// use tempfile::tempdir; + /// + /// // We use a temporary directory so that the socket + /// // files left by the bound sockets will get cleaned up. + /// let tmp = tempdir()?; + /// + /// // Bind each socket to a filesystem path + /// let tx_path = tmp.path().join("tx"); + /// let tx = UnixDatagram::bind(&tx_path)?; + /// let rx_path = tmp.path().join("rx"); + /// let rx = UnixDatagram::bind(&rx_path)?; + /// + /// let bytes = b"hello world"; + /// tx.send_to(bytes, &rx_path).await?; + /// + /// let mut buf = Vec::with_capacity(24); + /// let (size, addr) = rx.recv_buf_from(&mut buf).await?; + /// + /// let dgram = &buf[..size]; + /// assert_eq!(dgram, bytes); + /// assert_eq!(addr.as_pathname().unwrap(), &tx_path); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { + self.io.registration().async_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the + // buffer. + let (n, addr) = (*self.io).recv_from(dst)?; + + unsafe { + buf.advance_mut(n); + } + Ok((n,SocketAddr(addr))) + }).await + } + + /// Tries to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// + /// This method can be used even if `buf` is uninitialized. + /// /// # Examples /// /// ```no_run @@ -841,7 +975,7 @@ impl UnixDatagram { // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the // buffer. - let n = (&*self.io).recv(dst)?; + let n = (*self.io).recv(dst)?; unsafe { buf.advance_mut(n); @@ -850,6 +984,52 @@ impl UnixDatagram { Ok(n) }) } + + /// Receives data from the socket from the address to which it is connected, + /// advancing the buffer's internal cursor, returning how many bytes were read. + /// + /// This method can be used even if `buf` is uninitialized. + /// + /// # Examples + /// ``` + /// # use std::error::Error; + /// # #[tokio::main] + /// # async fn main() -> Result<(), Box<dyn Error>> { + /// use tokio::net::UnixDatagram; + /// + /// // Create the pair of sockets + /// let (sock1, sock2) = UnixDatagram::pair()?; + /// + /// // Since the sockets are paired, the paired send/recv + /// // functions can be used + /// let bytes = b"hello world"; + /// sock1.send(bytes).await?; + /// + /// let mut buff = Vec::with_capacity(24); + /// let size = sock2.recv_buf(&mut buff).await?; + /// + /// let dgram = &buff[..size]; + /// assert_eq!(dgram, bytes); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().async_io(Interest::READABLE, || { + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the + // buffer. + let n = (*self.io).recv(dst)?; + + unsafe { + buf.advance_mut(n); + } + Ok(n) + }).await + } } /// Sends data on the socket to the specified address. @@ -1091,7 +1271,7 @@ impl UnixDatagram { Poll::Ready(Ok(())) } - /// Try to receive data from the socket without waiting. + /// Tries to receive data from the socket without waiting. /// /// # Examples /// @@ -1143,6 +1323,84 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } + /// Tries to read or write from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The closure + /// should attempt to perform IO operation on the socket by manually + /// calling the appropriate syscall. If the operation fails because the + /// socket is not actually ready, then the closure should return a + /// `WouldBlock` error and the readiness flag is cleared. The return value + /// of the closure is then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `UnixDatagram` type, as this will mess with the + /// readiness flag and can cause the socket to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UnixDatagram::readable() + /// [`writable()`]: UnixDatagram::writable() + /// [`ready()`]: UnixDatagram::ready() + pub fn try_io<R>( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result<R>, + ) -> io::Result<R> { + self.io + .registration() + .try_io(interest, || self.io.try_io(f)) + } + + /// Reads or writes from the socket using a user-provided IO operation. + /// + /// The readiness of the socket is awaited and when the socket is ready, + /// the provided closure is called. The closure should attempt to perform + /// IO operation on the socket by manually calling the appropriate syscall. + /// If the operation fails because the socket is not actually ready, + /// then the closure should return a `WouldBlock` error. In such case the + /// readiness flag is cleared and the socket readiness is awaited again. + /// This loop is repeated until the closure returns an `Ok` or an error + /// other than `WouldBlock`. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `UnixDatagram` type, as this will mess with the + /// readiness flag and can cause the socket to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + pub async fn async_io<R>( + &self, + interest: Interest, + mut f: impl FnMut() -> io::Result<R>, + ) -> io::Result<R> { + self.io + .registration() + .async_io(interest, || self.io.try_io(&mut f)) + .await + } + /// Returns the local address that this socket is bound to. /// /// # Examples @@ -1319,3 +1577,10 @@ impl AsRawFd for UnixDatagram { self.io.as_raw_fd() } } + +#[cfg(not(tokio_no_as_fd))] +impl AsFd for UnixDatagram { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} diff --git a/vendor/tokio/src/net/unix/listener.rs b/vendor/tokio/src/net/unix/listener.rs index efb9503d4..37a5a4ae4 100644 --- a/vendor/tokio/src/net/unix/listener.rs +++ b/vendor/tokio/src/net/unix/listener.rs @@ -1,9 +1,10 @@ use crate::io::{Interest, PollEvented}; use crate::net::unix::{SocketAddr, UnixStream}; -use std::convert::TryFrom; use std::fmt; use std::io; +#[cfg(not(tokio_no_as_fd))] +use std::os::unix::io::{AsFd, BorrowedFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use std::path::Path; @@ -44,6 +45,7 @@ cfg_net_unix! { /// } /// } /// ``` + #[cfg_attr(docsrs, doc(alias = "uds"))] pub struct UnixListener { io: PollEvented<mio::net::UnixListener>, } @@ -54,11 +56,13 @@ impl UnixListener { /// /// # Panics /// - /// This function panics if thread-local runtime is not set. + /// This function panics if it is not called from within a runtime with + /// IO enabled. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[track_caller] pub fn bind<P>(path: P) -> io::Result<UnixListener> where P: AsRef<Path>, @@ -71,40 +75,62 @@ impl UnixListener { /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `. /// /// This function is intended to be used to wrap a UnixListener from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying listener; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the listener is in + /// non-blocking mode. Otherwise all I/O operations on the listener + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixListener; + /// use std::os::unix::net::UnixListener as StdUnixListener; + /// # use std::error::Error; + /// + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let std_listener = StdUnixListener::bind("/path/to/the/socket")?; + /// std_listener.set_nonblocking(true)?; + /// let listener = UnixListener::from_std(std_listener)?; + /// # Ok(()) + /// # } + /// ``` /// /// # Panics /// - /// This function panics if thread-local runtime is not set. + /// This function panics if it is not called from within a runtime with + /// IO enabled. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[track_caller] pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> { let listener = mio::net::UnixListener::from_std(listener); let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } - /// Turn a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. + /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`]. /// /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode - /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. + /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed. /// /// # Examples /// /// ```rust,no_run - /// use std::error::Error; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box<dyn Error>> { - /// let tokio_listener = tokio::net::UnixListener::bind("127.0.0.1:0")?; - /// let std_listener = tokio_listener.into_std()?; - /// std_listener.set_nonblocking(false)?; - /// Ok(()) - /// } + /// # use std::error::Error; + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket")?; + /// let std_listener = tokio_listener.into_std()?; + /// std_listener.set_nonblocking(false)?; + /// # Ok(()) + /// # } /// ``` /// /// [`tokio::net::UnixListener`]: UnixListener @@ -184,3 +210,10 @@ impl AsRawFd for UnixListener { self.io.as_raw_fd() } } + +#[cfg(not(tokio_no_as_fd))] +impl AsFd for UnixListener { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} diff --git a/vendor/tokio/src/net/unix/mod.rs b/vendor/tokio/src/net/unix/mod.rs index c3046f17e..a49b70af3 100644 --- a/vendor/tokio/src/net/unix/mod.rs +++ b/vendor/tokio/src/net/unix/mod.rs @@ -1,5 +1,4 @@ -//! Unix domain socket utility types - +//! Unix specific network types. // This module does not currently provide any public API, but it was // unintentionally defined as a public module. Hide it from the documentation // instead of changing it to a private module to avoid breakage. @@ -22,3 +21,17 @@ pub(crate) use stream::UnixStream; mod ucred; pub use ucred::UCred; + +pub mod pipe; + +/// A type representing process and process group IDs. +#[allow(non_camel_case_types)] +pub type uid_t = u32; + +/// A type representing user ID. +#[allow(non_camel_case_types)] +pub type gid_t = u32; + +/// A type representing group ID. +#[allow(non_camel_case_types)] +pub type pid_t = i32; diff --git a/vendor/tokio/src/net/unix/pipe.rs b/vendor/tokio/src/net/unix/pipe.rs new file mode 100644 index 000000000..188ac68b1 --- /dev/null +++ b/vendor/tokio/src/net/unix/pipe.rs @@ -0,0 +1,1222 @@ +//! Unix pipe types. + +use crate::io::interest::Interest; +use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready}; + +use mio::unix::pipe as mio_pipe; +use std::fs::File; +use std::io::{self, Read, Write}; +use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; +#[cfg(not(tokio_no_as_fd))] +use std::os::unix::io::{AsFd, BorrowedFd}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + use bytes::BufMut; +} + +/// Options and flags which can be used to configure how a FIFO file is opened. +/// +/// This builder allows configuring how to create a pipe end from a FIFO file. +/// Generally speaking, when using `OpenOptions`, you'll first call [`new`], +/// then chain calls to methods to set each option, then call either +/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you +/// are trying to open. This will give you a [`io::Result`][result] with a pipe +/// end inside that you can further operate on. +/// +/// [`new`]: OpenOptions::new +/// [`open_receiver`]: OpenOptions::open_receiver +/// [`open_sender`]: OpenOptions::open_sender +/// [result]: std::io::Result +/// +/// # Examples +/// +/// Opening a pair of pipe ends from a FIFO file: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// # use std::error::Error; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn Error>> { +/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: +/// +/// ```ignore +/// use tokio::net::unix::pipe; +/// use nix::{unistd::mkfifo, sys::stat::Mode}; +/// # use std::error::Error; +/// +/// // Our program has exclusive access to this path. +/// const FIFO_NAME: &str = "path/to/a/new/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn Error>> { +/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; +/// let tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .unchecked(true) +/// .open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone, Debug)] +pub struct OpenOptions { + #[cfg(target_os = "linux")] + read_write: bool, + unchecked: bool, +} + +impl OpenOptions { + /// Creates a blank new set of options ready for configuration. + /// + /// All options are initially set to `false`. + pub fn new() -> OpenOptions { + OpenOptions { + #[cfg(target_os = "linux")] + read_write: false, + unchecked: false, + } + } + + /// Sets the option for read-write access. + /// + /// This option, when true, will indicate that a FIFO file will be opened + /// in read-write access mode. This operation is not defined by the POSIX + /// standard and is only guaranteed to work on Linux. + /// + /// # Examples + /// + /// Opening a [`Sender`] even if there are no open reading ends: + /// + /// ```ignore + /// use tokio::net::unix::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_sender("path/to/a/fifo"); + /// ``` + /// + /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not + /// fail with [`UnexpectedEof`] during reading if all writing ends of the + /// pipe close the FIFO file. + /// + /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof + /// + /// ```ignore + /// use tokio::net::unix::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_receiver("path/to/a/fifo"); + /// ``` + #[cfg(target_os = "linux")] + #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))] + pub fn read_write(&mut self, value: bool) -> &mut Self { + self.read_write = value; + self + } + + /// Sets the option to skip the check for FIFO file type. + /// + /// By default, [`open_receiver`] and [`open_sender`] functions will check + /// if the opened file is a FIFO file. Set this option to `true` if you are + /// sure the file is a FIFO file. + /// + /// [`open_receiver`]: OpenOptions::open_receiver + /// [`open_sender`]: OpenOptions::open_sender + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use nix::{unistd::mkfifo, sys::stat::Mode}; + /// # use std::error::Error; + /// + /// // Our program has exclusive access to this path. + /// const FIFO_NAME: &str = "path/to/a/new/fifo"; + /// + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; + /// let rx = pipe::OpenOptions::new() + /// .unchecked(true) + /// .open_receiver(FIFO_NAME)?; + /// # Ok(()) + /// # } + /// ``` + pub fn unchecked(&mut self, value: bool) -> &mut Self { + self.unchecked = value; + self + } + + /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for reading. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. + /// This function may also fail with other standard OS errors. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> { + let file = self.open(path.as_ref(), PipeEnd::Receiver)?; + Receiver::from_file_unchecked(file) + } + + /// Creates a [`Sender`] from a FIFO file with the options specified by `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for writing. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`. + /// If the file is not opened in read-write access mode and the file is not + /// currently open for reading, this function will fail with `ENXIO`. + /// This function may also fail with other standard OS errors. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> { + let file = self.open(path.as_ref(), PipeEnd::Sender)?; + Sender::from_file_unchecked(file) + } + + fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> { + let mut options = std::fs::OpenOptions::new(); + options + .read(pipe_end == PipeEnd::Receiver) + .write(pipe_end == PipeEnd::Sender) + .custom_flags(libc::O_NONBLOCK); + + #[cfg(target_os = "linux")] + if self.read_write { + options.read(true).write(true); + } + + let file = options.open(path)?; + + if !self.unchecked && !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + Ok(file) + } +} + +impl Default for OpenOptions { + fn default() -> OpenOptions { + OpenOptions::new() + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +enum PipeEnd { + Sender, + Receiver, +} + +/// Writing end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. +/// +/// Opening a named pipe for writing involves a few steps. +/// Call to [`OpenOptions::open_sender`] might fail with an error indicating +/// different things: +/// +/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. +/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. +/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. +/// Sleep for a while and try again. +/// * Other OS errors not specific to opening FIFO files. +/// +/// Opening a `Sender` from a FIFO file should look like this: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::time::{self, Duration}; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// // Wait for a reader to open the file. +/// let tx = loop { +/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { +/// Ok(tx) => break tx, +/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {}, +/// Err(e) => return Err(e.into()), +/// } +/// +/// time::sleep(Duration::from_millis(50)).await; +/// }; +/// # Ok(()) +/// # } +/// ``` +/// +/// On Linux, it is possible to create a `Sender` without waiting in a sleeping +/// loop. This is done by opening a named pipe in read-write access mode with +/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold +/// both a writing end and a reading end, and the latter allows to open a FIFO +/// without [`ENXIO`] error since the pipe is open for reading as well. +/// +/// `Sender` cannot be used to read from a pipe, so in practice the read access +/// is only used when a FIFO is opened. However, using a `Sender` in read-write +/// mode **may lead to lost data**, because written data will be dropped by the +/// system as soon as all pipe ends are closed. To avoid lost data you have to +/// make sure that a reading end has been opened before dropping a `Sender`. +/// +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ```ignore +/// use tokio::io::AsyncWriteExt; +/// use tokio::net::unix::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let mut tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_sender(FIFO_NAME)?; +/// +/// // Asynchronously write to the pipe before a reader. +/// tx.write_all(b"hello world").await?; +/// # Ok(()) +/// # } +/// ``` +/// +/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html +#[derive(Debug)] +pub struct Sender { + io: PollEvented<mio_pipe::Sender>, +} + +impl Sender { + fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> { + let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?; + Ok(Sender { io }) + } + + /// Creates a new `Sender` from a [`File`]. + /// + /// This function is intended to construct a pipe from a [`File`] representing + /// a special FIFO file. It will check if the file is a pipe and has write access, + /// set it in non-blocking mode and perform the conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it + /// does not have write access. Also fails with any standard OS error if it occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file(mut file: File) -> io::Result<Sender> { + if !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + let flags = get_file_flags(&file)?; + if has_write_access(flags) { + set_nonblocking(&mut file, flags)?; + Sender::from_file_unchecked(file) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "not in O_WRONLY or O_RDWR access mode", + )) + } + } + + /// Creates a new `Sender` from a [`File`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from a File representing + /// a special FIFO file. The conversion assumes nothing about the underlying + /// file; it is left up to the user to make sure it is opened with write access, + /// represents a pipe and is set in non-blocking mode. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::fs::OpenOptions; + /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; + /// # use std::error::Error; + /// + /// const FIFO_NAME: &str = "path/to/a/fifo"; + /// + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let file = OpenOptions::new() + /// .write(true) + /// .custom_flags(libc::O_NONBLOCK) + /// .open(FIFO_NAME)?; + /// if file.metadata()?.file_type().is_fifo() { + /// let tx = pipe::Sender::from_file_unchecked(file)?; + /// /* use the Sender */ + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file_unchecked(file: File) -> io::Result<Sender> { + let raw_fd = file.into_raw_fd(); + let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) }; + Sender::from_mio(mio_tx) + } + + /// Waits for any of the requested ready states. + /// + /// This function can be used instead of [`writable()`] to check the returned + /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events. + /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// [`writable()`]: Self::writable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Waits for the pipe to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with [`try_write()`]. + /// + /// [`try_write()`]: Self::try_write + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + + /// Polls for write readiness. + /// + /// If the pipe is not currently ready for writing, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for writing, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`writable`]: Self::writable + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for writing. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + + /// Tries to write a buffer to the pipe, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. If the length of `buf` is not + /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the + /// write is guaranteed to be atomic, i.e. either the entire content of + /// `buf` will be written or this method will fail with `WouldBlock`. There + /// is no such guarantee if `buf` is larger than `PIPE_BUF`. + /// + /// This function is usually paired with [`writable`]. + /// + /// [`writable`]: Self::writable + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) + } + + /// Tries to write several buffers to the pipe, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// If the total length of buffers is not greater than `PIPE_BUF` (an OS + /// constant, 4096 under Linux), then the write is guaranteed to be atomic, + /// i.e. either the entire contents of buffers will be written or this + /// method will fail with `WouldBlock`. There is no such guarantee if the + /// total length of buffers is greater than `PIPE_BUF`. + /// + /// This function is usually paired with [`writable`]. + /// + /// [`try_write()`]: Self::try_write() + /// [`writable`]: Self::writable + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the pipe is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a writing end of a fifo + /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?; + /// + /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; + /// + /// loop { + /// // Wait for the pipe to be writable + /// tx.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match tx.try_write_vectored(&bufs) { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) + } +} + +impl AsyncWrite for Sender { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.io.poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.io.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + true + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +#[cfg(not(tokio_no_as_fd))] +impl AsFd for Sender { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +/// Reading end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. +/// +/// # Examples +/// +/// Receiving messages from a named pipe in a loop: +/// +/// ```no_run +/// use tokio::net::unix::pipe; +/// use tokio::io::{self, AsyncReadExt}; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { +/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// loop { +/// let mut msg = vec![0; 256]; +/// match rx.read_exact(&mut msg).await { +/// Ok(_) => { +/// /* handle the message */ +/// } +/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { +/// // Writing end has been closed, we should reopen the pipe. +/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// } +/// Err(e) => return Err(e.into()), +/// } +/// } +/// # } +/// ``` +/// +/// On Linux, you can use a `Receiver` in read-write access mode to implement +/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only +/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` +/// when the writing end is closed. This way, a `Receiver` can asynchronously +/// wait for the next writer to open the pipe. +/// +/// You should not use functions waiting for EOF such as [`read_to_end`] with +/// a `Receiver` in read-write access mode, since it **may wait forever**. +/// `Receiver` in this mode also holds an open writing end, which prevents +/// receiving EOF. +/// +/// To set the read-write access mode you can use `OpenOptions::read_write`. +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ```ignore +/// use tokio::net::unix::pipe; +/// use tokio::io::AsyncReadExt; +/// # use std::error::Error; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> Result<(), Box<dyn Error>> { +/// let mut rx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_receiver(FIFO_NAME)?; +/// loop { +/// let mut msg = vec![0; 256]; +/// rx.read_exact(&mut msg).await?; +/// /* handle the message */ +/// } +/// # } +/// ``` +/// +/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end +#[derive(Debug)] +pub struct Receiver { + io: PollEvented<mio_pipe::Receiver>, +} + +impl Receiver { + fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> { + let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?; + Ok(Receiver { io }) + } + + /// Creates a new `Receiver` from a [`File`]. + /// + /// This function is intended to construct a pipe from a [`File`] representing + /// a special FIFO file. It will check if the file is a pipe and has read access, + /// set it in non-blocking mode and perform the conversion. + /// + /// # Errors + /// + /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it + /// does not have read access. Also fails with any standard OS error if it occurs. + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file(mut file: File) -> io::Result<Receiver> { + if !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + let flags = get_file_flags(&file)?; + if has_read_access(flags) { + set_nonblocking(&mut file, flags)?; + Receiver::from_file_unchecked(file) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "not in O_RDONLY or O_RDWR access mode", + )) + } + } + + /// Creates a new `Receiver` from a [`File`] without checking pipe properties. + /// + /// This function is intended to construct a pipe from a File representing + /// a special FIFO file. The conversion assumes nothing about the underlying + /// file; it is left up to the user to make sure it is opened with read access, + /// represents a pipe and is set in non-blocking mode. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::fs::OpenOptions; + /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt}; + /// # use std::error::Error; + /// + /// const FIFO_NAME: &str = "path/to/a/fifo"; + /// + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let file = OpenOptions::new() + /// .read(true) + /// .custom_flags(libc::O_NONBLOCK) + /// .open(FIFO_NAME)?; + /// if file.metadata()?.file_type().is_fifo() { + /// let rx = pipe::Receiver::from_file_unchecked(file)?; + /// /* use the Receiver */ + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if it is not called from within a runtime with + /// IO enabled. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + pub fn from_file_unchecked(file: File) -> io::Result<Receiver> { + let raw_fd = file.into_raw_fd(); + let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) }; + Receiver::from_mio(mio_rx) + } + + /// Waits for any of the requested ready states. + /// + /// This function can be used instead of [`readable()`] to check the returned + /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events. + /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// [`readable()`]: Self::readable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Waits for the pipe to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with [`try_read()`]. + /// + /// [`try_read()`]: Self::try_read() + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + + /// Polls for read readiness. + /// + /// If the pipe is not currently ready for reading, this method will + /// store a clone of the `Waker` from the provided `Context`. When the pipe + /// becomes ready for reading, `Waker::wake` will be called on the waker. + /// + /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only + /// the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. + /// + /// [`readable`]: Self::readable + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if the pipe is not ready for reading. + /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + + /// Tries to read data from the pipe into the provided buffer, returning how + /// many bytes were read. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually [`readable()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. The pipe's writing end is closed and will no longer write data. + /// 2. The specified buffer was 0 bytes in length. + /// + /// If the pipe is not ready to read data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) + } + + /// Tries to read data from the pipe into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's writing end is + /// closed and will no longer write data. If the pipe is not ready to read + /// data `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf_a = [0; 512]; + /// let mut buf_b = [0; 1024]; + /// let mut bufs = [ + /// io::IoSliceMut::new(&mut buf_a), + /// io::IoSliceMut::new(&mut buf_b), + /// ]; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read_vectored(&mut bufs) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) + } + + cfg_io_util! { + /// Tries to read data from the pipe into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Reads any pending data from the pipe but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable + /// [`ready()`]: Self::ready + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the pipe's writing end is + /// closed and will no longer write data. If the pipe is not ready to read + /// data `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::unix::pipe; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// // Open a reading end of a fifo + /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?; + /// + /// loop { + /// // Wait for the pipe to be readable + /// rx.readable().await?; + /// + /// let mut buf = Vec::with_capacity(4096); + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match rx.try_read_buf(&mut buf) { + /// Ok(0) => break, + /// Ok(n) => { + /// println!("read {} bytes", n); + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.io.registration().try_io(Interest::READABLE, || { + use std::io::Read; + + let dst = buf.chunk_mut(); + let dst = + unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; + + // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, + // which correctly handles reads into uninitialized memory. + let n = (&*self.io).read(dst)?; + + unsafe { + buf.advance_mut(n); + } + + Ok(n) + }) + } + } +} + +impl AsyncRead for Receiver { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath, + // which correctly handles reads into uninitialized memory. + unsafe { self.io.poll_read(cx, buf) } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.io.as_raw_fd() + } +} + +#[cfg(not(tokio_no_as_fd))] +impl AsFd for Receiver { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} + +/// Checks if file is a FIFO +fn is_fifo(file: &File) -> io::Result<bool> { + Ok(file.metadata()?.file_type().is_fifo()) +} + +/// Gets file descriptor's flags by fcntl. +fn get_file_flags(file: &File) -> io::Result<libc::c_int> { + let fd = file.as_raw_fd(); + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + if flags < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(flags) + } +} + +/// Checks for O_RDONLY or O_RDWR access mode. +fn has_read_access(flags: libc::c_int) -> bool { + let mode = flags & libc::O_ACCMODE; + mode == libc::O_RDONLY || mode == libc::O_RDWR +} + +/// Checks for O_WRONLY or O_RDWR access mode. +fn has_write_access(flags: libc::c_int) -> bool { + let mode = flags & libc::O_ACCMODE; + mode == libc::O_WRONLY || mode == libc::O_RDWR +} + +/// Sets file's flags with O_NONBLOCK by fcntl. +fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> { + let fd = file.as_raw_fd(); + + let flags = current_flags | libc::O_NONBLOCK; + + if flags != current_flags { + let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) }; + if ret < 0 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} diff --git a/vendor/tokio/src/net/unix/split.rs b/vendor/tokio/src/net/unix/split.rs index 97214f7a7..6fc7067a7 100644 --- a/vendor/tokio/src/net/unix/split.rs +++ b/vendor/tokio/src/net/unix/split.rs @@ -8,14 +8,19 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::UnixStream; +use crate::net::unix::SocketAddr; use std::io; use std::net::Shutdown; use std::pin::Pin; use std::task::{Context, Poll}; +cfg_io_util! { + use bytes::BufMut; +} + /// Borrowed read half of a [`UnixStream`], created by [`split`]. /// /// Reading from a `ReadHalf` is usually done using the convenience methods found on the @@ -47,6 +52,232 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) { (ReadHalf(stream), WriteHalf(stream)) } +impl ReadHalf<'_> { + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with [`try_read()`]. It can be used instead + /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`] + /// and [`Ready::READ_CLOSED`] events. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// This function is equivalent to [`UnixStream::ready`]. + /// + /// [`try_read()`]: Self::try_read + /// [`readable()`]: Self::readable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.0.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. The stream's read half is closed and will no longer yield data. + /// 2. The specified buffer was 0 bytes in length. + /// + /// If the stream is not ready to read data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.0.try_read_buf(buf) + } + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.0.try_read_vectored(bufs) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + +impl WriteHalf<'_> { + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with [`try_write()`]. It can be used instead + /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`] + /// and [`Ready::WRITE_CLOSED`] events. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// This function is equivalent to [`UnixStream::ready`]. + /// + /// [`try_write()`]: Self::try_write + /// [`writable()`]: Self::writable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.0.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.0.try_write_vectored(buf) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + impl AsyncRead for ReadHalf<'_> { fn poll_read( self: Pin<&mut Self>, diff --git a/vendor/tokio/src/net/unix/split_owned.rs b/vendor/tokio/src/net/unix/split_owned.rs index 3d6ac6a7e..d1c6f93de 100644 --- a/vendor/tokio/src/net/unix/split_owned.rs +++ b/vendor/tokio/src/net/unix/split_owned.rs @@ -8,9 +8,10 @@ //! split has no associated overhead and enforces all invariants at the type //! level. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; use crate::net::UnixStream; +use crate::net::unix::SocketAddr; use std::error::Error; use std::net::Shutdown; use std::pin::Pin; @@ -18,6 +19,10 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::{fmt, io}; +cfg_io_util! { + use bytes::BufMut; +} + /// Owned read half of a [`UnixStream`], created by [`into_split`]. /// /// Reading from an `OwnedReadHalf` is usually done using the convenience methods found @@ -102,6 +107,139 @@ impl OwnedReadHalf { pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> { reunite(self, other) } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with [`try_read()`]. It can be used instead + /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`] + /// and [`Ready::READ_CLOSED`] events. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// This function is equivalent to [`UnixStream::ready`]. + /// + /// [`try_read()`]: Self::try_read + /// [`readable()`]: Self::readable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.inner.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. The stream's read half is closed and will no longer yield data. + /// 2. The specified buffer was 0 bytes in length. + /// + /// If the stream is not ready to read data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.try_read(buf) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.inner.try_read_buf(buf) + } + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.try_read_vectored(bufs) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl AsyncRead for OwnedReadHalf { @@ -124,13 +262,103 @@ impl OwnedWriteHalf { reunite(other, self) } - /// Destroy the write half, but don't close the write half of the stream + /// Destroys the write half, but don't close the write half of the stream /// until the read half is dropped. If the read half has already been /// dropped, this closes the stream. pub fn forget(mut self) { self.shutdown_on_drop = false; drop(self); } + + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with [`try_write()`]. It can be used instead + /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`] + /// and [`Ready::WRITE_CLOSED`] events. + /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// + /// This function is equivalent to [`UnixStream::ready`]. + /// + /// [`try_write()`]: Self::try_write + /// [`writable()`]: Self::writable + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.inner.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.inner.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.inner.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.inner.try_write_vectored(buf) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.inner.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.inner.local_addr() + } } impl Drop for OwnedWriteHalf { @@ -180,12 +408,12 @@ impl AsyncWrite for OwnedWriteHalf { impl AsRef<UnixStream> for OwnedReadHalf { fn as_ref(&self) -> &UnixStream { - &*self.inner + &self.inner } } impl AsRef<UnixStream> for OwnedWriteHalf { fn as_ref(&self) -> &UnixStream { - &*self.inner + &self.inner } } diff --git a/vendor/tokio/src/net/unix/stream.rs b/vendor/tokio/src/net/unix/stream.rs index 4baac6062..201645ba1 100644 --- a/vendor/tokio/src/net/unix/stream.rs +++ b/vendor/tokio/src/net/unix/stream.rs @@ -5,10 +5,11 @@ use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::unix::ucred::{self, UCred}; use crate::net::unix::SocketAddr; -use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; use std::net::Shutdown; +#[cfg(not(tokio_no_as_fd))] +use std::os::unix::io::{AsFd, BorrowedFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use std::path::Path; @@ -22,8 +23,8 @@ cfg_io_util! { cfg_net_unix! { /// A structure representing a connected Unix socket. /// - /// This socket can be connected directly with `UnixStream::connect` or accepted - /// from a listener with `UnixListener::incoming`. Additionally, a pair of + /// This socket can be connected directly with [`UnixStream::connect`] or accepted + /// from a listener with [`UnixListener::accept`]. Additionally, a pair of /// anonymous Unix sockets can be created with `UnixStream::pair`. /// /// To shut down the stream in the write direction, you can call the @@ -32,6 +33,8 @@ cfg_net_unix! { /// the stream in one direction. /// /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown + /// [`UnixListener::accept`]: crate::net::UnixListener::accept + #[cfg_attr(docsrs, doc(alias = "uds"))] pub struct UnixStream { io: PollEvented<mio::net::UnixStream>, } @@ -59,12 +62,18 @@ impl UnixStream { Ok(stream) } - /// Wait for any of the requested ready states. + /// Waits for any of the requested ready states. /// /// This function is usually paired with `try_read()` or `try_write()`. It /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method @@ -133,7 +142,7 @@ impl UnixStream { Ok(event.ready) } - /// Wait for the socket to become readable. + /// Waits for the socket to become readable. /// /// This function is equivalent to `ready(Interest::READABLE)` and is usually /// paired with `try_read()`. @@ -239,8 +248,12 @@ impl UnixStream { /// # Return /// /// If data is successfully read, `Ok(n)` is returned, where `n` is the - /// number of bytes read. `Ok(0)` indicates the stream's read half is closed - /// and will no longer yield data. If the stream is not ready to read data + /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. The stream's read half is closed and will no longer yield data. + /// 2. The specified buffer was 0 bytes in length. + /// + /// If the stream is not ready to read data, /// `Err(io::ErrorKind::WouldBlock)` is returned. /// /// # Examples @@ -290,7 +303,7 @@ impl UnixStream { .try_io(Interest::READABLE, || (&*self.io).read(buf)) } - /// Try to read data from the stream into the provided buffers, returning + /// Tries to read data from the stream into the provided buffers, returning /// how many bytes were read. /// /// Data is copied to fill each buffer in order, with the final buffer @@ -369,7 +382,7 @@ impl UnixStream { } cfg_io_util! { - /// Try to read data from the stream into the provided buffer, advancing the + /// Tries to read data from the stream into the provided buffer, advancing the /// buffer's internal cursor, returning how many bytes were read. /// /// Receives any pending data from the socket but does not wait for new data @@ -449,7 +462,7 @@ impl UnixStream { } } - /// Wait for the socket to become writable. + /// Waits for the socket to become writable. /// /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually /// paired with `try_write()`. @@ -535,7 +548,7 @@ impl UnixStream { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) } - /// Try to write a buffer to the stream, returning how many bytes were + /// Tries to write a buffer to the stream, returning how many bytes were /// written. /// /// The function will attempt to write the entire contents of `buf`, but @@ -591,7 +604,7 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } - /// Try to write several buffers to the stream, returning how many bytes + /// Tries to write several buffers to the stream, returning how many bytes /// were written. /// /// Data is written from each buffer in order, with the final buffer read @@ -653,20 +666,122 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + /// Tries to read or write from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The closure + /// should attempt to perform IO operation on the socket by manually + /// calling the appropriate syscall. If the operation fails because the + /// socket is not actually ready, then the closure should return a + /// `WouldBlock` error and the readiness flag is cleared. The return value + /// of the closure is then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `UnixStream` type, as this will mess with the + /// readiness flag and can cause the socket to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UnixStream::readable() + /// [`writable()`]: UnixStream::writable() + /// [`ready()`]: UnixStream::ready() + pub fn try_io<R>( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result<R>, + ) -> io::Result<R> { + self.io + .registration() + .try_io(interest, || self.io.try_io(f)) + } + + /// Reads or writes from the socket using a user-provided IO operation. + /// + /// The readiness of the socket is awaited and when the socket is ready, + /// the provided closure is called. The closure should attempt to perform + /// IO operation on the socket by manually calling the appropriate syscall. + /// If the operation fails because the socket is not actually ready, + /// then the closure should return a `WouldBlock` error. In such case the + /// readiness flag is cleared and the socket readiness is awaited again. + /// This loop is repeated until the closure returns an `Ok` or an error + /// other than `WouldBlock`. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the IO operation using any of the methods + /// defined on the Tokio `UnixStream` type, as this will mess with the + /// readiness flag and can cause the socket to behave incorrectly. + /// + /// This method is not intended to be used with combined interests. + /// The closure should perform only one type of IO operation, so it should not + /// require more than one ready state. This method may panic or sleep forever + /// if it is called with a combined interest. + pub async fn async_io<R>( + &self, + interest: Interest, + mut f: impl FnMut() -> io::Result<R>, + ) -> io::Result<R> { + self.io + .registration() + .async_io(interest, || self.io.try_io(&mut f)) + .await + } + /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the - /// standard library in the Tokio equivalent. The conversion assumes - /// nothing about the underlying stream; it is left up to the user to set - /// it in non-blocking mode. + /// standard library in the Tokio equivalent. + /// + /// # Notes + /// + /// The caller is responsible for ensuring that the stream is in + /// non-blocking mode. Otherwise all I/O operations on the stream + /// will block the thread, which will cause unexpected behavior. + /// Non-blocking mode can be set using [`set_nonblocking`]. + /// + /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// use std::os::unix::net::UnixStream as StdUnixStream; + /// # use std::error::Error; + /// + /// # async fn dox() -> Result<(), Box<dyn Error>> { + /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?; + /// std_stream.set_nonblocking(true)?; + /// let stream = UnixStream::from_std(std_stream)?; + /// # Ok(()) + /// # } + /// ``` /// /// # Panics /// - /// This function panics if thread-local runtime is not set. + /// This function panics if it is not called from within a runtime with + /// IO enabled. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. + #[track_caller] pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { let stream = mio::net::UnixStream::from_std(stream); let io = PollEvented::new(stream)?; @@ -674,7 +789,7 @@ impl UnixStream { Ok(UnixStream { io }) } - /// Turn a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. + /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. /// /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking /// mode set as `true`. Use [`set_nonblocking`] to change the blocking @@ -738,11 +853,41 @@ impl UnixStream { } /// Returns the socket address of the local half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// println!("{:?}", stream.local_addr()?); + /// # Ok(()) + /// # } + /// ``` pub fn local_addr(&self) -> io::Result<SocketAddr> { self.io.local_addr().map(SocketAddr) } /// Returns the socket address of the remote half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::UnixStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let dir = tempfile::tempdir().unwrap(); + /// let bind_path = dir.path().join("bind_path"); + /// let stream = UnixStream::connect(bind_path).await?; + /// + /// println!("{:?}", stream.peer_addr()?); + /// # Ok(()) + /// # } + /// ``` pub fn peer_addr(&self) -> io::Result<SocketAddr> { self.io.peer_addr().map(SocketAddr) } @@ -769,7 +914,7 @@ impl UnixStream { // These lifetime markers also appear in the generated documentation, and make // it more clear that this is a *borrowed* split. #[allow(clippy::needless_lifetimes)] - /// Split a `UnixStream` into a read half and a write half, which can be used + /// Splits a `UnixStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// /// This method is more efficient than [`into_split`], but the halves cannot be @@ -893,3 +1038,10 @@ impl AsRawFd for UnixStream { self.io.as_raw_fd() } } + +#[cfg(not(tokio_no_as_fd))] +impl AsFd for UnixStream { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } + } +} diff --git a/vendor/tokio/src/net/unix/ucred.rs b/vendor/tokio/src/net/unix/ucred.rs index 6310183d7..edfab08ab 100644 --- a/vendor/tokio/src/net/unix/ucred.rs +++ b/vendor/tokio/src/net/unix/ucred.rs @@ -1,24 +1,24 @@ -use libc::{gid_t, pid_t, uid_t}; +use crate::net::unix; -/// Credentials of a process +/// Credentials of a process. #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub struct UCred { - /// PID (process ID) of the process - pid: Option<pid_t>, - /// UID (user ID) of the process - uid: uid_t, - /// GID (group ID) of the process - gid: gid_t, + /// PID (process ID) of the process. + pid: Option<unix::pid_t>, + /// UID (user ID) of the process. + uid: unix::uid_t, + /// GID (group ID) of the process. + gid: unix::gid_t, } impl UCred { /// Gets UID (user ID) of the process. - pub fn uid(&self) -> uid_t { + pub fn uid(&self) -> unix::uid_t { self.uid } /// Gets GID (group ID) of the process. - pub fn gid(&self) -> gid_t { + pub fn gid(&self) -> unix::gid_t { self.gid } @@ -26,20 +26,23 @@ impl UCred { /// /// This is only implemented under Linux, Android, iOS, macOS, Solaris and /// Illumos. On other platforms this will always return `None`. - pub fn pid(&self) -> Option<pid_t> { + pub fn pid(&self) -> Option<unix::pid_t> { self.pid } } -#[cfg(any(target_os = "linux", target_os = "android"))] -pub(crate) use self::impl_linux::get_peer_cred; - #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd", + target_os = "linux", + target_os = "redox", + target_os = "android", target_os = "openbsd" ))] +pub(crate) use self::impl_linux::get_peer_cred; + +#[cfg(any(target_os = "netbsd"))] +pub(crate) use self::impl_netbsd::get_peer_cred; + +#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] pub(crate) use self::impl_bsd::get_peer_cred; #[cfg(any(target_os = "macos", target_os = "ios"))] @@ -48,13 +51,24 @@ pub(crate) use self::impl_macos::get_peer_cred; #[cfg(any(target_os = "solaris", target_os = "illumos"))] pub(crate) use self::impl_solaris::get_peer_cred; -#[cfg(any(target_os = "linux", target_os = "android"))] +#[cfg(target_os = "aix")] +pub(crate) use self::impl_aix::get_peer_cred; + +#[cfg(any( + target_os = "linux", + target_os = "redox", + target_os = "android", + target_os = "openbsd" +))] pub(crate) mod impl_linux { - use crate::net::unix::UnixStream; + use crate::net::unix::{self, UnixStream}; use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED}; use std::{io, mem}; + #[cfg(target_os = "openbsd")] + use libc::sockpeercred as ucred; + #[cfg(any(target_os = "linux", target_os = "redox", target_os = "android"))] use libc::ucred; pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { @@ -86,9 +100,9 @@ pub(crate) mod impl_linux { ); if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() { Ok(super::UCred { - uid: ucred.uid, - gid: ucred.gid, - pid: Some(ucred.pid), + uid: ucred.uid as unix::uid_t, + gid: ucred.gid as unix::gid_t, + pid: Some(ucred.pid as unix::pid_t), }) } else { Err(io::Error::last_os_error()) @@ -97,14 +111,51 @@ pub(crate) mod impl_linux { } } -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd" -))] +#[cfg(any(target_os = "netbsd"))] +pub(crate) mod impl_netbsd { + use crate::net::unix::{self, UnixStream}; + + use libc::{c_void, getsockopt, socklen_t, unpcbid, LOCAL_PEEREID, SOL_SOCKET}; + use std::io; + use std::mem::size_of; + use std::os::unix::io::AsRawFd; + + pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut unpcbid = unpcbid { + unp_pid: 0, + unp_euid: 0, + unp_egid: 0, + }; + + let unpcbid_size = size_of::<unpcbid>(); + let mut unpcbid_size = unpcbid_size as socklen_t; + + let ret = getsockopt( + raw_fd, + SOL_SOCKET, + LOCAL_PEEREID, + &mut unpcbid as *mut unpcbid as *mut c_void, + &mut unpcbid_size, + ); + if ret == 0 && unpcbid_size as usize == size_of::<unpcbid>() { + Ok(super::UCred { + uid: unpcbid.unp_euid as unix::uid_t, + gid: unpcbid.unp_egid as unix::gid_t, + pid: Some(unpcbid.unp_pid as unix::pid_t), + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] pub(crate) mod impl_bsd { - use crate::net::unix::UnixStream; + use crate::net::unix::{self, UnixStream}; use libc::getpeereid; use std::io; @@ -122,8 +173,8 @@ pub(crate) mod impl_bsd { if ret == 0 { Ok(super::UCred { - uid: uid.assume_init(), - gid: gid.assume_init(), + uid: uid.assume_init() as unix::uid_t, + gid: gid.assume_init() as unix::gid_t, pid: None, }) } else { @@ -135,7 +186,7 @@ pub(crate) mod impl_bsd { #[cfg(any(target_os = "macos", target_os = "ios"))] pub(crate) mod impl_macos { - use crate::net::unix::UnixStream; + use crate::net::unix::{self, UnixStream}; use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL}; use std::io; @@ -169,9 +220,9 @@ pub(crate) mod impl_macos { if ret == 0 { Ok(super::UCred { - uid: uid.assume_init(), - gid: gid.assume_init(), - pid: Some(pid.assume_init()), + uid: uid.assume_init() as unix::uid_t, + gid: gid.assume_init() as unix::gid_t, + pid: Some(pid.assume_init() as unix::pid_t), }) } else { Err(io::Error::last_os_error()) @@ -182,7 +233,7 @@ pub(crate) mod impl_macos { #[cfg(any(target_os = "solaris", target_os = "illumos"))] pub(crate) mod impl_solaris { - use crate::net::unix::UnixStream; + use crate::net::unix::{self, UnixStream}; use std::io; use std::os::unix::io::AsRawFd; use std::ptr; @@ -202,9 +253,37 @@ pub(crate) mod impl_solaris { libc::ucred_free(cred); Ok(super::UCred { - uid, - gid, - pid: Some(pid), + uid: uid as unix::uid_t, + gid: gid as unix::gid_t, + pid: Some(pid as unix::pid_t), + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(target_os = "aix")] +pub(crate) mod impl_aix { + use crate::net::unix::UnixStream; + use std::io; + use std::os::unix::io::AsRawFd; + + pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut uid = std::mem::MaybeUninit::uninit(); + let mut gid = std::mem::MaybeUninit::uninit(); + + let ret = libc::getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr()); + + if ret == 0 { + Ok(super::UCred { + uid: uid.assume_init(), + gid: gid.assume_init(), + pid: None, }) } else { Err(io::Error::last_os_error()) |