summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/net/unix
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/net/unix
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/net/unix')
-rw-r--r--vendor/tokio/src/net/unix/datagram/socket.rs321
-rw-r--r--vendor/tokio/src/net/unix/listener.rs67
-rw-r--r--vendor/tokio/src/net/unix/mod.rs17
-rw-r--r--vendor/tokio/src/net/unix/pipe.rs1222
-rw-r--r--vendor/tokio/src/net/unix/split.rs233
-rw-r--r--vendor/tokio/src/net/unix/split_owned.rs236
-rw-r--r--vendor/tokio/src/net/unix/stream.rs188
-rw-r--r--vendor/tokio/src/net/unix/ucred.rs157
8 files changed, 2332 insertions, 109 deletions
diff --git a/vendor/tokio/src/net/unix/datagram/socket.rs b/vendor/tokio/src/net/unix/datagram/socket.rs
index 2d2177803..c97d101aa 100644
--- a/vendor/tokio/src/net/unix/datagram/socket.rs
+++ b/vendor/tokio/src/net/unix/datagram/socket.rs
@@ -1,10 +1,11 @@
use crate::io::{Interest, PollEvented, ReadBuf, Ready};
use crate::net::unix::SocketAddr;
-use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::Shutdown;
+#[cfg(not(tokio_no_as_fd))]
+use std::os::unix::io::{AsFd, BorrowedFd};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net;
use std::path::Path;
@@ -90,13 +91,14 @@ cfg_net_unix! {
/// # Ok(())
/// # }
/// ```
+ #[cfg_attr(docsrs, doc(alias = "uds"))]
pub struct UnixDatagram {
io: PollEvented<mio::net::UnixDatagram>,
}
}
impl UnixDatagram {
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_recv()` or `try_send()`. It
/// can be used to concurrently recv / send to the same socket on a single
@@ -104,7 +106,9 @@ impl UnixDatagram {
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
- /// `io::ErrorKind::WouldBlock`.
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
///
/// # Cancel safety
///
@@ -169,7 +173,7 @@ impl UnixDatagram {
Ok(event.ready)
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is
/// usually paired with `try_send()` or `try_send_to()`.
@@ -226,7 +230,40 @@ impl UnixDatagram {
Ok(())
}
- /// Wait for the socket to become readable.
+ /// Polls for write/send readiness.
+ ///
+ /// If the socket is not currently ready for sending, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the socket
+ /// becomes ready for sending, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`writable`]: method@Self::writable
+ pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_recv()`.
@@ -289,6 +326,39 @@ impl UnixDatagram {
Ok(())
}
+ /// Polls for read/receive readiness.
+ ///
+ /// If the socket is not currently ready for receiving, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the
+ /// socket becomes ready for reading, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
+ /// `poll_peek`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup. (However,
+ /// `poll_send_ready` retains a second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`readable`]: method@Self::readable
+ pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
/// Creates a new `UnixDatagram` bound to the specified path.
///
/// # Examples
@@ -358,13 +428,21 @@ impl UnixDatagram {
/// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
///
/// This function is intended to be used to wrap a UnixDatagram from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying datagram; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socker is in
+ /// non-blocking mode. Otherwise all I/O operations on the socket
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixDatagram::set_nonblocking
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
@@ -391,30 +469,29 @@ impl UnixDatagram {
/// # Ok(())
/// # }
/// ```
+ #[track_caller]
pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
let socket = mio::net::UnixDatagram::from_std(datagram);
let io = PollEvented::new(socket)?;
Ok(UnixDatagram { io })
}
- /// Turn a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
+ /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
///
/// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
- /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
+ /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
/// if needed.
///
/// # Examples
///
/// ```rust,no_run
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?;
- /// let std_socket = tokio_socket.into_std()?;
- /// std_socket.set_nonblocking(false)?;
- /// Ok(())
- /// }
+ /// # use std::error::Error;
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let tokio_socket = tokio::net::UnixDatagram::bind("/path/to/the/socket")?;
+ /// let std_socket = tokio_socket.into_std()?;
+ /// std_socket.set_nonblocking(false)?;
+ /// # Ok(())
+ /// # }
/// ```
///
/// [`tokio::net::UnixDatagram`]: UnixDatagram
@@ -548,7 +625,7 @@ impl UnixDatagram {
.await
}
- /// Try to send a datagram to the peer without waiting.
+ /// Tries to send a datagram to the peer without waiting.
///
/// # Examples
///
@@ -592,7 +669,7 @@ impl UnixDatagram {
.try_io(Interest::WRITABLE, || self.io.send(buf))
}
- /// Try to send a datagram to the peer without waiting.
+ /// Tries to send a datagram to the peer without waiting.
///
/// # Examples
///
@@ -678,7 +755,7 @@ impl UnixDatagram {
.await
}
- /// Try to receive a datagram from the peer without waiting.
+ /// Tries to receive a datagram from the peer without waiting.
///
/// # Examples
///
@@ -729,7 +806,9 @@ impl UnixDatagram {
}
cfg_io_util! {
- /// Try to receive data from the socket without waiting.
+ /// Tries to receive data from the socket without waiting.
+ ///
+ /// This method can be used even if `buf` is uninitialized.
///
/// # Examples
///
@@ -778,7 +857,7 @@ impl UnixDatagram {
// Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
// buffer.
- let (n, addr) = (&*self.io).recv_from(dst)?;
+ let (n, addr) = (*self.io).recv_from(dst)?;
unsafe {
buf.advance_mut(n);
@@ -790,9 +869,64 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Receives from the socket, advances the
+ /// buffer's internal cursor and returns how many bytes were read and the origin.
+ ///
+ /// This method can be used even if `buf` is uninitialized.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = Vec::with_capacity(24);
+ /// let (size, addr) = rx.recv_buf_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
+ self.io.registration().async_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let (n, addr) = (*self.io).recv_from(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+ Ok((n,SocketAddr(addr)))
+ }).await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
+ /// This method can be used even if `buf` is uninitialized.
+ ///
/// # Examples
///
/// ```no_run
@@ -841,7 +975,7 @@ impl UnixDatagram {
// Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
// buffer.
- let n = (&*self.io).recv(dst)?;
+ let n = (*self.io).recv(dst)?;
unsafe {
buf.advance_mut(n);
@@ -850,6 +984,52 @@ impl UnixDatagram {
Ok(n)
})
}
+
+ /// Receives data from the socket from the address to which it is connected,
+ /// advancing the buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// This method can be used even if `buf` is uninitialized.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = Vec::with_capacity(24);
+ /// let size = sock2.recv_buf(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().async_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let n = (*self.io).recv(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+ Ok(n)
+ }).await
+ }
}
/// Sends data on the socket to the specified address.
@@ -1091,7 +1271,7 @@ impl UnixDatagram {
Poll::Ready(Ok(()))
}
- /// Try to receive data from the socket without waiting.
+ /// Tries to receive data from the socket without waiting.
///
/// # Examples
///
@@ -1143,6 +1323,84 @@ impl UnixDatagram {
Ok((n, SocketAddr(addr)))
}
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation on the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixDatagram` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixDatagram::readable()
+ /// [`writable()`]: UnixDatagram::writable()
+ /// [`ready()`]: UnixDatagram::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .try_io(interest, || self.io.try_io(f))
+ }
+
+ /// Reads or writes from the socket using a user-provided IO operation.
+ ///
+ /// The readiness of the socket is awaited and when the socket is ready,
+ /// the provided closure is called. The closure should attempt to perform
+ /// IO operation on the socket by manually calling the appropriate syscall.
+ /// If the operation fails because the socket is not actually ready,
+ /// then the closure should return a `WouldBlock` error. In such case the
+ /// readiness flag is cleared and the socket readiness is awaited again.
+ /// This loop is repeated until the closure returns an `Ok` or an error
+ /// other than `WouldBlock`.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixDatagram` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ pub async fn async_io<R>(
+ &self,
+ interest: Interest,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .async_io(interest, || self.io.try_io(&mut f))
+ .await
+ }
+
/// Returns the local address that this socket is bound to.
///
/// # Examples
@@ -1319,3 +1577,10 @@ impl AsRawFd for UnixDatagram {
self.io.as_raw_fd()
}
}
+
+#[cfg(not(tokio_no_as_fd))]
+impl AsFd for UnixDatagram {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
diff --git a/vendor/tokio/src/net/unix/listener.rs b/vendor/tokio/src/net/unix/listener.rs
index efb9503d4..37a5a4ae4 100644
--- a/vendor/tokio/src/net/unix/listener.rs
+++ b/vendor/tokio/src/net/unix/listener.rs
@@ -1,9 +1,10 @@
use crate::io::{Interest, PollEvented};
use crate::net::unix::{SocketAddr, UnixStream};
-use std::convert::TryFrom;
use std::fmt;
use std::io;
+#[cfg(not(tokio_no_as_fd))]
+use std::os::unix::io::{AsFd, BorrowedFd};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net;
use std::path::Path;
@@ -44,6 +45,7 @@ cfg_net_unix! {
/// }
/// }
/// ```
+ #[cfg_attr(docsrs, doc(alias = "uds"))]
pub struct UnixListener {
io: PollEvented<mio::net::UnixListener>,
}
@@ -54,11 +56,13 @@ impl UnixListener {
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
@@ -71,40 +75,62 @@ impl UnixListener {
/// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `.
///
/// This function is intended to be used to wrap a UnixListener from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying listener; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the listener is in
+ /// non-blocking mode. Otherwise all I/O operations on the listener
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixListener::set_nonblocking
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixListener;
+ /// use std::os::unix::net::UnixListener as StdUnixListener;
+ /// # use std::error::Error;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let std_listener = StdUnixListener::bind("/path/to/the/socket")?;
+ /// std_listener.set_nonblocking(true)?;
+ /// let listener = UnixListener::from_std(std_listener)?;
+ /// # Ok(())
+ /// # }
+ /// ```
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio::net::UnixListener::from_std(listener);
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
- /// Turn a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
+ /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
///
/// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
- /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
+ /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
///
/// # Examples
///
/// ```rust,no_run
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// let tokio_listener = tokio::net::UnixListener::bind("127.0.0.1:0")?;
- /// let std_listener = tokio_listener.into_std()?;
- /// std_listener.set_nonblocking(false)?;
- /// Ok(())
- /// }
+ /// # use std::error::Error;
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let tokio_listener = tokio::net::UnixListener::bind("/path/to/the/socket")?;
+ /// let std_listener = tokio_listener.into_std()?;
+ /// std_listener.set_nonblocking(false)?;
+ /// # Ok(())
+ /// # }
/// ```
///
/// [`tokio::net::UnixListener`]: UnixListener
@@ -184,3 +210,10 @@ impl AsRawFd for UnixListener {
self.io.as_raw_fd()
}
}
+
+#[cfg(not(tokio_no_as_fd))]
+impl AsFd for UnixListener {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
diff --git a/vendor/tokio/src/net/unix/mod.rs b/vendor/tokio/src/net/unix/mod.rs
index c3046f17e..a49b70af3 100644
--- a/vendor/tokio/src/net/unix/mod.rs
+++ b/vendor/tokio/src/net/unix/mod.rs
@@ -1,5 +1,4 @@
-//! Unix domain socket utility types
-
+//! Unix specific network types.
// This module does not currently provide any public API, but it was
// unintentionally defined as a public module. Hide it from the documentation
// instead of changing it to a private module to avoid breakage.
@@ -22,3 +21,17 @@ pub(crate) use stream::UnixStream;
mod ucred;
pub use ucred::UCred;
+
+pub mod pipe;
+
+/// A type representing process and process group IDs.
+#[allow(non_camel_case_types)]
+pub type uid_t = u32;
+
+/// A type representing user ID.
+#[allow(non_camel_case_types)]
+pub type gid_t = u32;
+
+/// A type representing group ID.
+#[allow(non_camel_case_types)]
+pub type pid_t = i32;
diff --git a/vendor/tokio/src/net/unix/pipe.rs b/vendor/tokio/src/net/unix/pipe.rs
new file mode 100644
index 000000000..188ac68b1
--- /dev/null
+++ b/vendor/tokio/src/net/unix/pipe.rs
@@ -0,0 +1,1222 @@
+//! Unix pipe types.
+
+use crate::io::interest::Interest;
+use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
+
+use mio::unix::pipe as mio_pipe;
+use std::fs::File;
+use std::io::{self, Read, Write};
+use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
+#[cfg(not(tokio_no_as_fd))]
+use std::os::unix::io::{AsFd, BorrowedFd};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::path::Path;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
+/// Options and flags which can be used to configure how a FIFO file is opened.
+///
+/// This builder allows configuring how to create a pipe end from a FIFO file.
+/// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
+/// then chain calls to methods to set each option, then call either
+/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
+/// are trying to open. This will give you a [`io::Result`][result] with a pipe
+/// end inside that you can further operate on.
+///
+/// [`new`]: OpenOptions::new
+/// [`open_receiver`]: OpenOptions::open_receiver
+/// [`open_sender`]: OpenOptions::open_sender
+/// [result]: std::io::Result
+///
+/// # Examples
+///
+/// Opening a pair of pipe ends from a FIFO file:
+///
+/// ```no_run
+/// use tokio::net::unix::pipe;
+/// # use std::error::Error;
+///
+/// const FIFO_NAME: &str = "path/to/a/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn Error>> {
+/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
+/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
+///
+/// ```ignore
+/// use tokio::net::unix::pipe;
+/// use nix::{unistd::mkfifo, sys::stat::Mode};
+/// # use std::error::Error;
+///
+/// // Our program has exclusive access to this path.
+/// const FIFO_NAME: &str = "path/to/a/new/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn Error>> {
+/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
+/// let tx = pipe::OpenOptions::new()
+/// .read_write(true)
+/// .unchecked(true)
+/// .open_sender(FIFO_NAME)?;
+/// # Ok(())
+/// # }
+/// ```
+#[derive(Clone, Debug)]
+pub struct OpenOptions {
+ #[cfg(target_os = "linux")]
+ read_write: bool,
+ unchecked: bool,
+}
+
+impl OpenOptions {
+ /// Creates a blank new set of options ready for configuration.
+ ///
+ /// All options are initially set to `false`.
+ pub fn new() -> OpenOptions {
+ OpenOptions {
+ #[cfg(target_os = "linux")]
+ read_write: false,
+ unchecked: false,
+ }
+ }
+
+ /// Sets the option for read-write access.
+ ///
+ /// This option, when true, will indicate that a FIFO file will be opened
+ /// in read-write access mode. This operation is not defined by the POSIX
+ /// standard and is only guaranteed to work on Linux.
+ ///
+ /// # Examples
+ ///
+ /// Opening a [`Sender`] even if there are no open reading ends:
+ ///
+ /// ```ignore
+ /// use tokio::net::unix::pipe;
+ ///
+ /// let tx = pipe::OpenOptions::new()
+ /// .read_write(true)
+ /// .open_sender("path/to/a/fifo");
+ /// ```
+ ///
+ /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
+ /// fail with [`UnexpectedEof`] during reading if all writing ends of the
+ /// pipe close the FIFO file.
+ ///
+ /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
+ ///
+ /// ```ignore
+ /// use tokio::net::unix::pipe;
+ ///
+ /// let tx = pipe::OpenOptions::new()
+ /// .read_write(true)
+ /// .open_receiver("path/to/a/fifo");
+ /// ```
+ #[cfg(target_os = "linux")]
+ #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
+ pub fn read_write(&mut self, value: bool) -> &mut Self {
+ self.read_write = value;
+ self
+ }
+
+ /// Sets the option to skip the check for FIFO file type.
+ ///
+ /// By default, [`open_receiver`] and [`open_sender`] functions will check
+ /// if the opened file is a FIFO file. Set this option to `true` if you are
+ /// sure the file is a FIFO file.
+ ///
+ /// [`open_receiver`]: OpenOptions::open_receiver
+ /// [`open_sender`]: OpenOptions::open_sender
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use nix::{unistd::mkfifo, sys::stat::Mode};
+ /// # use std::error::Error;
+ ///
+ /// // Our program has exclusive access to this path.
+ /// const FIFO_NAME: &str = "path/to/a/new/fifo";
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
+ /// let rx = pipe::OpenOptions::new()
+ /// .unchecked(true)
+ /// .open_receiver(FIFO_NAME)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn unchecked(&mut self, value: bool) -> &mut Self {
+ self.unchecked = value;
+ self
+ }
+
+ /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
+ ///
+ /// This function will open the FIFO file at the specified path, possibly
+ /// check if it is a pipe, and associate the pipe with the default event
+ /// loop for reading.
+ ///
+ /// # Errors
+ ///
+ /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
+ /// This function may also fail with other standard OS errors.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
+ let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
+ Receiver::from_file_unchecked(file)
+ }
+
+ /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
+ ///
+ /// This function will open the FIFO file at the specified path, possibly
+ /// check if it is a pipe, and associate the pipe with the default event
+ /// loop for writing.
+ ///
+ /// # Errors
+ ///
+ /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
+ /// If the file is not opened in read-write access mode and the file is not
+ /// currently open for reading, this function will fail with `ENXIO`.
+ /// This function may also fail with other standard OS errors.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
+ let file = self.open(path.as_ref(), PipeEnd::Sender)?;
+ Sender::from_file_unchecked(file)
+ }
+
+ fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
+ let mut options = std::fs::OpenOptions::new();
+ options
+ .read(pipe_end == PipeEnd::Receiver)
+ .write(pipe_end == PipeEnd::Sender)
+ .custom_flags(libc::O_NONBLOCK);
+
+ #[cfg(target_os = "linux")]
+ if self.read_write {
+ options.read(true).write(true);
+ }
+
+ let file = options.open(path)?;
+
+ if !self.unchecked && !is_fifo(&file)? {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
+ }
+
+ Ok(file)
+ }
+}
+
+impl Default for OpenOptions {
+ fn default() -> OpenOptions {
+ OpenOptions::new()
+ }
+}
+
+#[derive(Clone, Copy, PartialEq, Eq, Debug)]
+enum PipeEnd {
+ Sender,
+ Receiver,
+}
+
+/// Writing end of a Unix pipe.
+///
+/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
+///
+/// Opening a named pipe for writing involves a few steps.
+/// Call to [`OpenOptions::open_sender`] might fail with an error indicating
+/// different things:
+///
+/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
+/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
+/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
+/// Sleep for a while and try again.
+/// * Other OS errors not specific to opening FIFO files.
+///
+/// Opening a `Sender` from a FIFO file should look like this:
+///
+/// ```no_run
+/// use tokio::net::unix::pipe;
+/// use tokio::time::{self, Duration};
+///
+/// const FIFO_NAME: &str = "path/to/a/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// // Wait for a reader to open the file.
+/// let tx = loop {
+/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
+/// Ok(tx) => break tx,
+/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
+/// Err(e) => return Err(e.into()),
+/// }
+///
+/// time::sleep(Duration::from_millis(50)).await;
+/// };
+/// # Ok(())
+/// # }
+/// ```
+///
+/// On Linux, it is possible to create a `Sender` without waiting in a sleeping
+/// loop. This is done by opening a named pipe in read-write access mode with
+/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
+/// both a writing end and a reading end, and the latter allows to open a FIFO
+/// without [`ENXIO`] error since the pipe is open for reading as well.
+///
+/// `Sender` cannot be used to read from a pipe, so in practice the read access
+/// is only used when a FIFO is opened. However, using a `Sender` in read-write
+/// mode **may lead to lost data**, because written data will be dropped by the
+/// system as soon as all pipe ends are closed. To avoid lost data you have to
+/// make sure that a reading end has been opened before dropping a `Sender`.
+///
+/// Note that using read-write access mode with FIFO files is not defined by
+/// the POSIX standard and it is only guaranteed to work on Linux.
+///
+/// ```ignore
+/// use tokio::io::AsyncWriteExt;
+/// use tokio::net::unix::pipe;
+///
+/// const FIFO_NAME: &str = "path/to/a/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let mut tx = pipe::OpenOptions::new()
+/// .read_write(true)
+/// .open_sender(FIFO_NAME)?;
+///
+/// // Asynchronously write to the pipe before a reader.
+/// tx.write_all(b"hello world").await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
+#[derive(Debug)]
+pub struct Sender {
+ io: PollEvented<mio_pipe::Sender>,
+}
+
+impl Sender {
+ fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
+ let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
+ Ok(Sender { io })
+ }
+
+ /// Creates a new `Sender` from a [`File`].
+ ///
+ /// This function is intended to construct a pipe from a [`File`] representing
+ /// a special FIFO file. It will check if the file is a pipe and has write access,
+ /// set it in non-blocking mode and perform the conversion.
+ ///
+ /// # Errors
+ ///
+ /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
+ /// does not have write access. Also fails with any standard OS error if it occurs.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_file(mut file: File) -> io::Result<Sender> {
+ if !is_fifo(&file)? {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
+ }
+
+ let flags = get_file_flags(&file)?;
+ if has_write_access(flags) {
+ set_nonblocking(&mut file, flags)?;
+ Sender::from_file_unchecked(file)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "not in O_WRONLY or O_RDWR access mode",
+ ))
+ }
+ }
+
+ /// Creates a new `Sender` from a [`File`] without checking pipe properties.
+ ///
+ /// This function is intended to construct a pipe from a File representing
+ /// a special FIFO file. The conversion assumes nothing about the underlying
+ /// file; it is left up to the user to make sure it is opened with write access,
+ /// represents a pipe and is set in non-blocking mode.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::fs::OpenOptions;
+ /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
+ /// # use std::error::Error;
+ ///
+ /// const FIFO_NAME: &str = "path/to/a/fifo";
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let file = OpenOptions::new()
+ /// .write(true)
+ /// .custom_flags(libc::O_NONBLOCK)
+ /// .open(FIFO_NAME)?;
+ /// if file.metadata()?.file_type().is_fifo() {
+ /// let tx = pipe::Sender::from_file_unchecked(file)?;
+ /// /* use the Sender */
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
+ let raw_fd = file.into_raw_fd();
+ let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
+ Sender::from_mio(mio_tx)
+ }
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function can be used instead of [`writable()`] to check the returned
+ /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
+ ///
+ /// The function may complete without the pipe being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// [`writable()`]: Self::writable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Waits for the pipe to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with [`try_write()`].
+ ///
+ /// [`try_write()`]: Self::try_write
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a writing end of a fifo
+ /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
+ ///
+ /// loop {
+ /// // Wait for the pipe to be writable
+ /// tx.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match tx.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for write readiness.
+ ///
+ /// If the pipe is not currently ready for writing, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the pipe
+ /// becomes ready for writing, `Waker::wake` will be called on the waker.
+ ///
+ /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup.
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// [`writable`]: Self::writable
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the pipe is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
+ /// Tries to write a buffer to the pipe, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written. If the length of `buf` is not
+ /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
+ /// write is guaranteed to be atomic, i.e. either the entire content of
+ /// `buf` will be written or this method will fail with `WouldBlock`. There
+ /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
+ ///
+ /// This function is usually paired with [`writable`].
+ ///
+ /// [`writable`]: Self::writable
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the pipe is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a writing end of a fifo
+ /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
+ ///
+ /// loop {
+ /// // Wait for the pipe to be writable
+ /// tx.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match tx.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
+ }
+
+ /// Tries to write several buffers to the pipe, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
+ /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
+ /// i.e. either the entire contents of buffers will be written or this
+ /// method will fail with `WouldBlock`. There is no such guarantee if the
+ /// total length of buffers is greater than `PIPE_BUF`.
+ ///
+ /// This function is usually paired with [`writable`].
+ ///
+ /// [`try_write()`]: Self::try_write()
+ /// [`writable`]: Self::writable
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the pipe is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a writing end of a fifo
+ /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
+ ///
+ /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
+ ///
+ /// loop {
+ /// // Wait for the pipe to be writable
+ /// tx.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match tx.try_write_vectored(&bufs) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
+ }
+}
+
+impl AsyncWrite for Sender {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.io.poll_write(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.io.poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ true
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl AsRawFd for Sender {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
+
+#[cfg(not(tokio_no_as_fd))]
+impl AsFd for Sender {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
+
+/// Reading end of a Unix pipe.
+///
+/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
+///
+/// # Examples
+///
+/// Receiving messages from a named pipe in a loop:
+///
+/// ```no_run
+/// use tokio::net::unix::pipe;
+/// use tokio::io::{self, AsyncReadExt};
+///
+/// const FIFO_NAME: &str = "path/to/a/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
+/// loop {
+/// let mut msg = vec![0; 256];
+/// match rx.read_exact(&mut msg).await {
+/// Ok(_) => {
+/// /* handle the message */
+/// }
+/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
+/// // Writing end has been closed, we should reopen the pipe.
+/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
+/// }
+/// Err(e) => return Err(e.into()),
+/// }
+/// }
+/// # }
+/// ```
+///
+/// On Linux, you can use a `Receiver` in read-write access mode to implement
+/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
+/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
+/// when the writing end is closed. This way, a `Receiver` can asynchronously
+/// wait for the next writer to open the pipe.
+///
+/// You should not use functions waiting for EOF such as [`read_to_end`] with
+/// a `Receiver` in read-write access mode, since it **may wait forever**.
+/// `Receiver` in this mode also holds an open writing end, which prevents
+/// receiving EOF.
+///
+/// To set the read-write access mode you can use `OpenOptions::read_write`.
+/// Note that using read-write access mode with FIFO files is not defined by
+/// the POSIX standard and it is only guaranteed to work on Linux.
+///
+/// ```ignore
+/// use tokio::net::unix::pipe;
+/// use tokio::io::AsyncReadExt;
+/// # use std::error::Error;
+///
+/// const FIFO_NAME: &str = "path/to/a/fifo";
+///
+/// # async fn dox() -> Result<(), Box<dyn Error>> {
+/// let mut rx = pipe::OpenOptions::new()
+/// .read_write(true)
+/// .open_receiver(FIFO_NAME)?;
+/// loop {
+/// let mut msg = vec![0; 256];
+/// rx.read_exact(&mut msg).await?;
+/// /* handle the message */
+/// }
+/// # }
+/// ```
+///
+/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
+#[derive(Debug)]
+pub struct Receiver {
+ io: PollEvented<mio_pipe::Receiver>,
+}
+
+impl Receiver {
+ fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
+ let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
+ Ok(Receiver { io })
+ }
+
+ /// Creates a new `Receiver` from a [`File`].
+ ///
+ /// This function is intended to construct a pipe from a [`File`] representing
+ /// a special FIFO file. It will check if the file is a pipe and has read access,
+ /// set it in non-blocking mode and perform the conversion.
+ ///
+ /// # Errors
+ ///
+ /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
+ /// does not have read access. Also fails with any standard OS error if it occurs.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_file(mut file: File) -> io::Result<Receiver> {
+ if !is_fifo(&file)? {
+ return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
+ }
+
+ let flags = get_file_flags(&file)?;
+ if has_read_access(flags) {
+ set_nonblocking(&mut file, flags)?;
+ Receiver::from_file_unchecked(file)
+ } else {
+ Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "not in O_RDONLY or O_RDWR access mode",
+ ))
+ }
+ }
+
+ /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
+ ///
+ /// This function is intended to construct a pipe from a File representing
+ /// a special FIFO file. The conversion assumes nothing about the underlying
+ /// file; it is left up to the user to make sure it is opened with read access,
+ /// represents a pipe and is set in non-blocking mode.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::fs::OpenOptions;
+ /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
+ /// # use std::error::Error;
+ ///
+ /// const FIFO_NAME: &str = "path/to/a/fifo";
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let file = OpenOptions::new()
+ /// .read(true)
+ /// .custom_flags(libc::O_NONBLOCK)
+ /// .open(FIFO_NAME)?;
+ /// if file.metadata()?.file_type().is_fifo() {
+ /// let rx = pipe::Receiver::from_file_unchecked(file)?;
+ /// /* use the Receiver */
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
+ let raw_fd = file.into_raw_fd();
+ let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
+ Receiver::from_mio(mio_rx)
+ }
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function can be used instead of [`readable()`] to check the returned
+ /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
+ ///
+ /// The function may complete without the pipe being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// [`readable()`]: Self::readable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Waits for the pipe to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with [`try_read()`].
+ ///
+ /// [`try_read()`]: Self::try_read()
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a reading end of a fifo
+ /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the pipe to be readable
+ /// rx.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match rx.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for read readiness.
+ ///
+ /// If the pipe is not currently ready for reading, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the pipe
+ /// becomes ready for reading, `Waker::wake` will be called on the waker.
+ ///
+ /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup.
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// [`readable`]: Self::readable
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the pipe is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
+ /// Tries to read data from the pipe into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Reads any pending data from the pipe but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually [`readable()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The pipe's writing end is closed and will no longer write data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the pipe is not ready to read data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a reading end of a fifo
+ /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the pipe to be readable
+ /// rx.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match rx.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read(buf))
+ }
+
+ /// Tries to read data from the pipe into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Reads any pending data from the pipe but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
+ /// closed and will no longer write data. If the pipe is not ready to read
+ /// data `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a reading end of a fifo
+ /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
+ ///
+ /// loop {
+ /// // Wait for the pipe to be readable
+ /// rx.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf_a = [0; 512];
+ /// let mut buf_b = [0; 1024];
+ /// let mut bufs = [
+ /// io::IoSliceMut::new(&mut buf_a),
+ /// io::IoSliceMut::new(&mut buf_b),
+ /// ];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match rx.try_read_vectored(&mut bufs) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the pipe into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Reads any pending data from the pipe but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable
+ /// [`ready()`]: Self::ready
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
+ /// closed and will no longer write data. If the pipe is not ready to read
+ /// data `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::unix::pipe;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Open a reading end of a fifo
+ /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
+ ///
+ /// loop {
+ /// // Wait for the pipe to be readable
+ /// rx.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(4096);
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match rx.try_read_buf(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ use std::io::Read;
+
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
+ // which correctly handles reads into uninitialized memory.
+ let n = (&*self.io).read(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+}
+
+impl AsyncRead for Receiver {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
+ // which correctly handles reads into uninitialized memory.
+ unsafe { self.io.poll_read(cx, buf) }
+ }
+}
+
+impl AsRawFd for Receiver {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
+
+#[cfg(not(tokio_no_as_fd))]
+impl AsFd for Receiver {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
+
+/// Checks if file is a FIFO
+fn is_fifo(file: &File) -> io::Result<bool> {
+ Ok(file.metadata()?.file_type().is_fifo())
+}
+
+/// Gets file descriptor's flags by fcntl.
+fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
+ let fd = file.as_raw_fd();
+ let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
+ if flags < 0 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(flags)
+ }
+}
+
+/// Checks for O_RDONLY or O_RDWR access mode.
+fn has_read_access(flags: libc::c_int) -> bool {
+ let mode = flags & libc::O_ACCMODE;
+ mode == libc::O_RDONLY || mode == libc::O_RDWR
+}
+
+/// Checks for O_WRONLY or O_RDWR access mode.
+fn has_write_access(flags: libc::c_int) -> bool {
+ let mode = flags & libc::O_ACCMODE;
+ mode == libc::O_WRONLY || mode == libc::O_RDWR
+}
+
+/// Sets file's flags with O_NONBLOCK by fcntl.
+fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
+ let fd = file.as_raw_fd();
+
+ let flags = current_flags | libc::O_NONBLOCK;
+
+ if flags != current_flags {
+ let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
+ if ret < 0 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+
+ Ok(())
+}
diff --git a/vendor/tokio/src/net/unix/split.rs b/vendor/tokio/src/net/unix/split.rs
index 97214f7a7..6fc7067a7 100644
--- a/vendor/tokio/src/net/unix/split.rs
+++ b/vendor/tokio/src/net/unix/split.rs
@@ -8,14 +8,19 @@
//! split has no associated overhead and enforces all invariants at the type
//! level.
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::UnixStream;
+use crate::net::unix::SocketAddr;
use std::io;
use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Borrowed read half of a [`UnixStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
@@ -47,6 +52,232 @@ pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
(ReadHalf(stream), WriteHalf(stream))
}
+impl ReadHalf<'_> {
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_read()`]. It can be used instead
+ /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
+ /// and [`Ready::READ_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`UnixStream::ready`].
+ ///
+ /// [`try_read()`]: Self::try_read
+ /// [`readable()`]: Self::readable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.0.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.0.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl WriteHalf<'_> {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_write()`]. It can be used instead
+ /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
+ /// and [`Ready::WRITE_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`UnixStream::ready`].
+ ///
+ /// [`try_write()`]: Self::try_write
+ /// [`writable()`]: Self::writable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.0.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.0.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
impl AsyncRead for ReadHalf<'_> {
fn poll_read(
self: Pin<&mut Self>,
diff --git a/vendor/tokio/src/net/unix/split_owned.rs b/vendor/tokio/src/net/unix/split_owned.rs
index 3d6ac6a7e..d1c6f93de 100644
--- a/vendor/tokio/src/net/unix/split_owned.rs
+++ b/vendor/tokio/src/net/unix/split_owned.rs
@@ -8,9 +8,10 @@
//! split has no associated overhead and enforces all invariants at the type
//! level.
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::UnixStream;
+use crate::net::unix::SocketAddr;
use std::error::Error;
use std::net::Shutdown;
use std::pin::Pin;
@@ -18,6 +19,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Owned read half of a [`UnixStream`], created by [`into_split`].
///
/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
@@ -102,6 +107,139 @@ impl OwnedReadHalf {
pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
reunite(self, other)
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_read()`]. It can be used instead
+ /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
+ /// and [`Ready::READ_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`UnixStream::ready`].
+ ///
+ /// [`try_read()`]: Self::try_read
+ /// [`readable()`]: Self::readable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.inner.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.inner.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl AsyncRead for OwnedReadHalf {
@@ -124,13 +262,103 @@ impl OwnedWriteHalf {
reunite(other, self)
}
- /// Destroy the write half, but don't close the write half of the stream
+ /// Destroys the write half, but don't close the write half of the stream
/// until the read half is dropped. If the read half has already been
/// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_write()`]. It can be used instead
+ /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
+ /// and [`Ready::WRITE_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`UnixStream::ready`].
+ ///
+ /// [`try_write()`]: Self::try_write
+ /// [`writable()`]: Self::writable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.inner.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl Drop for OwnedWriteHalf {
@@ -180,12 +408,12 @@ impl AsyncWrite for OwnedWriteHalf {
impl AsRef<UnixStream> for OwnedReadHalf {
fn as_ref(&self) -> &UnixStream {
- &*self.inner
+ &self.inner
}
}
impl AsRef<UnixStream> for OwnedWriteHalf {
fn as_ref(&self) -> &UnixStream {
- &*self.inner
+ &self.inner
}
}
diff --git a/vendor/tokio/src/net/unix/stream.rs b/vendor/tokio/src/net/unix/stream.rs
index 4baac6062..201645ba1 100644
--- a/vendor/tokio/src/net/unix/stream.rs
+++ b/vendor/tokio/src/net/unix/stream.rs
@@ -5,10 +5,11 @@ use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::unix::ucred::{self, UCred};
use crate::net::unix::SocketAddr;
-use std::convert::TryFrom;
use std::fmt;
use std::io::{self, Read, Write};
use std::net::Shutdown;
+#[cfg(not(tokio_no_as_fd))]
+use std::os::unix::io::{AsFd, BorrowedFd};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::net;
use std::path::Path;
@@ -22,8 +23,8 @@ cfg_io_util! {
cfg_net_unix! {
/// A structure representing a connected Unix socket.
///
- /// This socket can be connected directly with `UnixStream::connect` or accepted
- /// from a listener with `UnixListener::incoming`. Additionally, a pair of
+ /// This socket can be connected directly with [`UnixStream::connect`] or accepted
+ /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
/// anonymous Unix sockets can be created with `UnixStream::pair`.
///
/// To shut down the stream in the write direction, you can call the
@@ -32,6 +33,8 @@ cfg_net_unix! {
/// the stream in one direction.
///
/// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
+ /// [`UnixListener::accept`]: crate::net::UnixListener::accept
+ #[cfg_attr(docsrs, doc(alias = "uds"))]
pub struct UnixStream {
io: PollEvented<mio::net::UnixStream>,
}
@@ -59,12 +62,18 @@ impl UnixStream {
Ok(stream)
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
@@ -133,7 +142,7 @@ impl UnixStream {
Ok(event.ready)
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -239,8 +248,12 @@ impl UnixStream {
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
- /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
- /// and will no longer yield data. If the stream is not ready to read data
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
@@ -290,7 +303,7 @@ impl UnixStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the stream into the provided buffers, returning
+ /// Tries to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -369,7 +382,7 @@ impl UnixStream {
}
cfg_io_util! {
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -449,7 +462,7 @@ impl UnixStream {
}
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -535,7 +548,7 @@ impl UnixStream {
self.io.registration().poll_write_ready(cx).map_ok(|_| ())
}
- /// Try to write a buffer to the stream, returning how many bytes were
+ /// Tries to write a buffer to the stream, returning how many bytes were
/// written.
///
/// The function will attempt to write the entire contents of `buf`, but
@@ -591,7 +604,7 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the stream, returning how many bytes
+ /// Tries to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -653,20 +666,122 @@ impl UnixStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
}
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation on the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixStream::readable()
+ /// [`writable()`]: UnixStream::writable()
+ /// [`ready()`]: UnixStream::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .try_io(interest, || self.io.try_io(f))
+ }
+
+ /// Reads or writes from the socket using a user-provided IO operation.
+ ///
+ /// The readiness of the socket is awaited and when the socket is ready,
+ /// the provided closure is called. The closure should attempt to perform
+ /// IO operation on the socket by manually calling the appropriate syscall.
+ /// If the operation fails because the socket is not actually ready,
+ /// then the closure should return a `WouldBlock` error. In such case the
+ /// readiness flag is cleared and the socket readiness is awaited again.
+ /// This loop is repeated until the closure returns an `Ok` or an error
+ /// other than `WouldBlock`.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ pub async fn async_io<R>(
+ &self,
+ interest: Interest,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .async_io(interest, || self.io.try_io(&mut f))
+ .await
+ }
+
/// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
///
/// This function is intended to be used to wrap a UnixStream from the
- /// standard library in the Tokio equivalent. The conversion assumes
- /// nothing about the underlying stream; it is left up to the user to set
- /// it in non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::os::unix::net::UnixStream as StdUnixStream;
+ /// # use std::error::Error;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn Error>> {
+ /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
+ /// std_stream.set_nonblocking(true)?;
+ /// let stream = UnixStream::from_std(std_stream)?;
+ /// # Ok(())
+ /// # }
+ /// ```
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio::net::UnixStream::from_std(stream);
let io = PollEvented::new(stream)?;
@@ -674,7 +789,7 @@ impl UnixStream {
Ok(UnixStream { io })
}
- /// Turn a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
+ /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
///
/// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
/// mode set as `true`. Use [`set_nonblocking`] to change the blocking
@@ -738,11 +853,41 @@ impl UnixStream {
}
/// Returns the socket address of the local half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.local_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr().map(SocketAddr)
}
/// Returns the socket address of the remote half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.peer_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.io.peer_addr().map(SocketAddr)
}
@@ -769,7 +914,7 @@ impl UnixStream {
// These lifetime markers also appear in the generated documentation, and make
// it more clear that this is a *borrowed* split.
#[allow(clippy::needless_lifetimes)]
- /// Split a `UnixStream` into a read half and a write half, which can be used
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
/// to read and write the stream concurrently.
///
/// This method is more efficient than [`into_split`], but the halves cannot be
@@ -893,3 +1038,10 @@ impl AsRawFd for UnixStream {
self.io.as_raw_fd()
}
}
+
+#[cfg(not(tokio_no_as_fd))]
+impl AsFd for UnixStream {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
diff --git a/vendor/tokio/src/net/unix/ucred.rs b/vendor/tokio/src/net/unix/ucred.rs
index 6310183d7..edfab08ab 100644
--- a/vendor/tokio/src/net/unix/ucred.rs
+++ b/vendor/tokio/src/net/unix/ucred.rs
@@ -1,24 +1,24 @@
-use libc::{gid_t, pid_t, uid_t};
+use crate::net::unix;
-/// Credentials of a process
+/// Credentials of a process.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct UCred {
- /// PID (process ID) of the process
- pid: Option<pid_t>,
- /// UID (user ID) of the process
- uid: uid_t,
- /// GID (group ID) of the process
- gid: gid_t,
+ /// PID (process ID) of the process.
+ pid: Option<unix::pid_t>,
+ /// UID (user ID) of the process.
+ uid: unix::uid_t,
+ /// GID (group ID) of the process.
+ gid: unix::gid_t,
}
impl UCred {
/// Gets UID (user ID) of the process.
- pub fn uid(&self) -> uid_t {
+ pub fn uid(&self) -> unix::uid_t {
self.uid
}
/// Gets GID (group ID) of the process.
- pub fn gid(&self) -> gid_t {
+ pub fn gid(&self) -> unix::gid_t {
self.gid
}
@@ -26,20 +26,23 @@ impl UCred {
///
/// This is only implemented under Linux, Android, iOS, macOS, Solaris and
/// Illumos. On other platforms this will always return `None`.
- pub fn pid(&self) -> Option<pid_t> {
+ pub fn pid(&self) -> Option<unix::pid_t> {
self.pid
}
}
-#[cfg(any(target_os = "linux", target_os = "android"))]
-pub(crate) use self::impl_linux::get_peer_cred;
-
#[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "netbsd",
+ target_os = "linux",
+ target_os = "redox",
+ target_os = "android",
target_os = "openbsd"
))]
+pub(crate) use self::impl_linux::get_peer_cred;
+
+#[cfg(any(target_os = "netbsd"))]
+pub(crate) use self::impl_netbsd::get_peer_cred;
+
+#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
pub(crate) use self::impl_bsd::get_peer_cred;
#[cfg(any(target_os = "macos", target_os = "ios"))]
@@ -48,13 +51,24 @@ pub(crate) use self::impl_macos::get_peer_cred;
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub(crate) use self::impl_solaris::get_peer_cred;
-#[cfg(any(target_os = "linux", target_os = "android"))]
+#[cfg(target_os = "aix")]
+pub(crate) use self::impl_aix::get_peer_cred;
+
+#[cfg(any(
+ target_os = "linux",
+ target_os = "redox",
+ target_os = "android",
+ target_os = "openbsd"
+))]
pub(crate) mod impl_linux {
- use crate::net::unix::UnixStream;
+ use crate::net::unix::{self, UnixStream};
use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED};
use std::{io, mem};
+ #[cfg(target_os = "openbsd")]
+ use libc::sockpeercred as ucred;
+ #[cfg(any(target_os = "linux", target_os = "redox", target_os = "android"))]
use libc::ucred;
pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
@@ -86,9 +100,9 @@ pub(crate) mod impl_linux {
);
if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() {
Ok(super::UCred {
- uid: ucred.uid,
- gid: ucred.gid,
- pid: Some(ucred.pid),
+ uid: ucred.uid as unix::uid_t,
+ gid: ucred.gid as unix::gid_t,
+ pid: Some(ucred.pid as unix::pid_t),
})
} else {
Err(io::Error::last_os_error())
@@ -97,14 +111,51 @@ pub(crate) mod impl_linux {
}
}
-#[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "netbsd",
- target_os = "openbsd"
-))]
+#[cfg(any(target_os = "netbsd"))]
+pub(crate) mod impl_netbsd {
+ use crate::net::unix::{self, UnixStream};
+
+ use libc::{c_void, getsockopt, socklen_t, unpcbid, LOCAL_PEEREID, SOL_SOCKET};
+ use std::io;
+ use std::mem::size_of;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut unpcbid = unpcbid {
+ unp_pid: 0,
+ unp_euid: 0,
+ unp_egid: 0,
+ };
+
+ let unpcbid_size = size_of::<unpcbid>();
+ let mut unpcbid_size = unpcbid_size as socklen_t;
+
+ let ret = getsockopt(
+ raw_fd,
+ SOL_SOCKET,
+ LOCAL_PEEREID,
+ &mut unpcbid as *mut unpcbid as *mut c_void,
+ &mut unpcbid_size,
+ );
+ if ret == 0 && unpcbid_size as usize == size_of::<unpcbid>() {
+ Ok(super::UCred {
+ uid: unpcbid.unp_euid as unix::uid_t,
+ gid: unpcbid.unp_egid as unix::gid_t,
+ pid: Some(unpcbid.unp_pid as unix::pid_t),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
pub(crate) mod impl_bsd {
- use crate::net::unix::UnixStream;
+ use crate::net::unix::{self, UnixStream};
use libc::getpeereid;
use std::io;
@@ -122,8 +173,8 @@ pub(crate) mod impl_bsd {
if ret == 0 {
Ok(super::UCred {
- uid: uid.assume_init(),
- gid: gid.assume_init(),
+ uid: uid.assume_init() as unix::uid_t,
+ gid: gid.assume_init() as unix::gid_t,
pid: None,
})
} else {
@@ -135,7 +186,7 @@ pub(crate) mod impl_bsd {
#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) mod impl_macos {
- use crate::net::unix::UnixStream;
+ use crate::net::unix::{self, UnixStream};
use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL};
use std::io;
@@ -169,9 +220,9 @@ pub(crate) mod impl_macos {
if ret == 0 {
Ok(super::UCred {
- uid: uid.assume_init(),
- gid: gid.assume_init(),
- pid: Some(pid.assume_init()),
+ uid: uid.assume_init() as unix::uid_t,
+ gid: gid.assume_init() as unix::gid_t,
+ pid: Some(pid.assume_init() as unix::pid_t),
})
} else {
Err(io::Error::last_os_error())
@@ -182,7 +233,7 @@ pub(crate) mod impl_macos {
#[cfg(any(target_os = "solaris", target_os = "illumos"))]
pub(crate) mod impl_solaris {
- use crate::net::unix::UnixStream;
+ use crate::net::unix::{self, UnixStream};
use std::io;
use std::os::unix::io::AsRawFd;
use std::ptr;
@@ -202,9 +253,37 @@ pub(crate) mod impl_solaris {
libc::ucred_free(cred);
Ok(super::UCred {
- uid,
- gid,
- pid: Some(pid),
+ uid: uid as unix::uid_t,
+ gid: gid as unix::gid_t,
+ pid: Some(pid as unix::pid_t),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(target_os = "aix")]
+pub(crate) mod impl_aix {
+ use crate::net::unix::UnixStream;
+ use std::io;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut uid = std::mem::MaybeUninit::uninit();
+ let mut gid = std::mem::MaybeUninit::uninit();
+
+ let ret = libc::getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr());
+
+ if ret == 0 {
+ Ok(super::UCred {
+ uid: uid.assume_init(),
+ gid: gid.assume_init(),
+ pid: None,
})
} else {
Err(io::Error::last_os_error())