summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/net/unix
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/net/unix
parentInitial commit. (diff)
downloadfirefox-esr-upstream.tar.xz
firefox-esr-upstream.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/net/unix')
-rw-r--r--third_party/rust/tokio/src/net/unix/datagram/mod.rs3
-rw-r--r--third_party/rust/tokio/src/net/unix/datagram/socket.rs1422
-rw-r--r--third_party/rust/tokio/src/net/unix/listener.rs186
-rw-r--r--third_party/rust/tokio/src/net/unix/mod.rs24
-rw-r--r--third_party/rust/tokio/src/net/unix/socketaddr.rs31
-rw-r--r--third_party/rust/tokio/src/net/unix/split.rs305
-rw-r--r--third_party/rust/tokio/src/net/unix/split_owned.rs393
-rw-r--r--third_party/rust/tokio/src/net/unix/stream.rs960
-rw-r--r--third_party/rust/tokio/src/net/unix/ucred.rs252
9 files changed, 3576 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/net/unix/datagram/mod.rs b/third_party/rust/tokio/src/net/unix/datagram/mod.rs
new file mode 100644
index 0000000000..6268b4ac90
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/datagram/mod.rs
@@ -0,0 +1,3 @@
+//! Unix datagram types.
+
+pub(crate) mod socket;
diff --git a/third_party/rust/tokio/src/net/unix/datagram/socket.rs b/third_party/rust/tokio/src/net/unix/datagram/socket.rs
new file mode 100644
index 0000000000..d5b618663d
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/datagram/socket.rs
@@ -0,0 +1,1422 @@
+use crate::io::{Interest, PollEvented, ReadBuf, Ready};
+use crate::net::unix::SocketAddr;
+
+use std::convert::TryFrom;
+use std::fmt;
+use std::io;
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::net;
+use std::path::Path;
+use std::task::{Context, Poll};
+
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
+cfg_net_unix! {
+ /// An I/O object representing a Unix datagram socket.
+ ///
+ /// A socket can be either named (associated with a filesystem path) or
+ /// unnamed.
+ ///
+ /// This type does not provide a `split` method, because this functionality
+ /// can be achieved by wrapping the socket in an [`Arc`]. Note that you do
+ /// not need a `Mutex` to share the `UnixDatagram` — an `Arc<UnixDatagram>`
+ /// is enough. This is because all of the methods take `&self` instead of
+ /// `&mut self`.
+ ///
+ /// **Note:** named sockets are persisted even after the object is dropped
+ /// and the program has exited, and cannot be reconnected. It is advised
+ /// that you either check for and unlink the existing socket if it exists,
+ /// or use a temporary file that is guaranteed to not already exist.
+ ///
+ /// [`Arc`]: std::sync::Arc
+ ///
+ /// # Examples
+ /// Using named sockets, associated with a filesystem path:
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// Using unnamed sockets, created as a pair
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub struct UnixDatagram {
+ io: PollEvented<mio::net::UnixDatagram>,
+ }
+}
+
+impl UnixDatagram {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_recv()` or `try_send()`. It
+ /// can be used to concurrently recv / send to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently receive from and send to the socket on the same task
+ /// without splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// let mut data = [0; 1024];
+ /// match socket.try_recv(&mut data[..]) {
+ /// Ok(n) => {
+ /// println!("received {:?}", &data[..n]);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Write some data
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// println!("sent {} bytes", n);
+ /// }
+ /// // False-positive, continue
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is
+ /// usually paired with `try_send()` or `try_send_to()`.
+ ///
+ /// The function may complete without the socket being writable. This is a
+ /// false-positive and attempting a `try_send()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for write/send readiness.
+ ///
+ /// If the socket is not currently ready for sending, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the socket
+ /// becomes ready for sending, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`writable`]: method@Self::writable
+ pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_recv()`.
+ ///
+ /// The function may complete without the socket being readable. This is a
+ /// false-positive and attempting a `try_recv()` will return with
+ /// `io::ErrorKind::WouldBlock`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for read/receive readiness.
+ ///
+ /// If the socket is not currently ready for receiving, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the
+ /// socket becomes ready for reading, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
+ /// `poll_peek`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup. (However,
+ /// `poll_send_ready` retains a second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the socket is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`readable`]: method@Self::readable
+ pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
+ /// Creates a new `UnixDatagram` bound to the specified path.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind the socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let socket = UnixDatagram::bind(&socket_path)?;
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
+ where
+ P: AsRef<Path>,
+ {
+ let socket = mio::net::UnixDatagram::bind(path)?;
+ UnixDatagram::new(socket)
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hail eris";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
+ let (a, b) = mio::net::UnixDatagram::pair()?;
+ let a = UnixDatagram::new(a)?;
+ let b = UnixDatagram::new(b)?;
+
+ Ok((a, b))
+ }
+
+ /// Creates new `UnixDatagram` from a `std::os::unix::net::UnixDatagram`.
+ ///
+ /// This function is intended to be used to wrap a UnixDatagram from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying datagram; it is left up to the user to set
+ /// it in non-blocking mode.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a Tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use std::os::unix::net::UnixDatagram as StdUDS;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind the socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let std_socket = StdUDS::bind(&socket_path)?;
+ /// std_socket.set_nonblocking(true)?;
+ /// let tokio_socket = UnixDatagram::from_std(std_socket)?;
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
+ let socket = mio::net::UnixDatagram::from_std(datagram);
+ let io = PollEvented::new(socket)?;
+ Ok(UnixDatagram { io })
+ }
+
+ /// Turns a [`tokio::net::UnixDatagram`] into a [`std::os::unix::net::UnixDatagram`].
+ ///
+ /// The returned [`std::os::unix::net::UnixDatagram`] will have nonblocking
+ /// mode set as `true`. Use [`set_nonblocking`] to change the blocking mode
+ /// if needed.
+ ///
+ /// # Examples
+ ///
+ /// ```rust,no_run
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let tokio_socket = tokio::net::UnixDatagram::bind("127.0.0.1:0")?;
+ /// let std_socket = tokio_socket.into_std()?;
+ /// std_socket.set_nonblocking(false)?;
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// [`tokio::net::UnixDatagram`]: UnixDatagram
+ /// [`std::os::unix::net::UnixDatagram`]: std::os::unix::net::UnixDatagram
+ /// [`set_nonblocking`]: fn@std::os::unix::net::UnixDatagram::set_nonblocking
+ pub fn into_std(self) -> io::Result<std::os::unix::net::UnixDatagram> {
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(raw_fd) })
+ }
+
+ fn new(socket: mio::net::UnixDatagram) -> io::Result<UnixDatagram> {
+ let io = PollEvented::new(socket)?;
+ Ok(UnixDatagram { io })
+ }
+
+ /// Creates a new `UnixDatagram` which is not bound to any address.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Send to the bound socket
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn unbound() -> io::Result<UnixDatagram> {
+ let socket = mio::net::UnixDatagram::unbound()?;
+ UnixDatagram::new(socket)
+ }
+
+ /// Connects the socket to the specified address.
+ ///
+ /// The `send` method may be used to send data to the specified address.
+ /// `recv` and `recv_from` will only receive data from that address.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Connect to the bound socket
+ /// tx.connect(&rx_path)?;
+ ///
+ /// // Send to the bound socket
+ /// let bytes = b"hello world";
+ /// tx.send(bytes).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
+ self.io.connect(path)
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `send` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that the message was not sent.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .async_io(Interest::WRITABLE, || self.io.send(buf))
+ .await
+ }
+
+ /// Tries to send a datagram to the peer without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send(buf))
+ }
+
+ /// Tries to send a datagram to the peer without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// socket.writable().await?;
+ ///
+ /// // Try to send data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_send_to(b"hello world", &server_path) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path>,
+ {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `recv` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, it is guaranteed that no messages were received on this
+ /// socket.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// // Since the sockets are paired, the paired send/recv
+ /// // functions can be used
+ /// let bytes = b"hello world";
+ /// sock1.send(bytes).await?;
+ ///
+ /// let mut buff = vec![0u8; 24];
+ /// let size = sock2.recv(&mut buff).await?;
+ ///
+ /// let dgram = &buff[..size];
+ /// assert_eq!(dgram, bytes);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.recv(buf))
+ .await
+ }
+
+ /// Tries to receive a datagram from the peer without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv(buf))
+ }
+
+ cfg_io_util! {
+ /// Tries to receive data from the socket without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv_from` to have filled up `n` bytes in the
+ // buffer.
+ let (n, addr) = (&*self.io).recv_from(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok((n, addr))
+ })?;
+
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ /// socket.connect(&server_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(1024);
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_buf(&mut buf) {
+ /// Ok(n) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixDatagram::recv` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).recv(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+
+ /// Sends data on the socket to the specified address.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `send_to` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that the message was not sent.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
+ where
+ P: AsRef<Path>,
+ {
+ self.io
+ .registration()
+ .async_io(Interest::WRITABLE, || self.io.send_to(buf, target.as_ref()))
+ .await
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If `recv_from` is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, it is guaranteed that no messages were received on this
+ /// socket.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind each socket to a filesystem path
+ /// let tx_path = tmp.path().join("tx");
+ /// let tx = UnixDatagram::bind(&tx_path)?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// let bytes = b"hello world";
+ /// tx.send_to(bytes, &rx_path).await?;
+ ///
+ /// let mut buf = vec![0u8; 24];
+ /// let (size, addr) = rx.recv_from(&mut buf).await?;
+ ///
+ /// let dgram = &buf[..size];
+ /// assert_eq!(dgram, bytes);
+ /// assert_eq!(addr.as_pathname().unwrap(), &tx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self
+ .io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.recv_from(buf))
+ .await?;
+
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Attempts to receive a single datagram on the specified address.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_recv_from(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<SocketAddr>> {
+ let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uninitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv_from(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(SocketAddr(addr)))
+ }
+
+ /// Attempts to send data to the specified address.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ pub fn poll_send_to<P>(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ target: P,
+ ) -> Poll<io::Result<usize>>
+ where
+ P: AsRef<Path>,
+ {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send_to(buf, target.as_ref()))
+ }
+
+ /// Attempts to send data on the socket to the remote address to which it
+ /// was previously `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address.
+ /// This method will fail if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the send direction,
+ /// only the `Waker` from the `Context` passed to the most recent call will
+ /// be scheduled to receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not available to write
+ /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
+ self.io
+ .registration()
+ .poll_write_io(cx, || self.io.send(buf))
+ }
+
+ /// Attempts to receive a single datagram message on the socket from the remote
+ /// address to which it is `connect`ed.
+ ///
+ /// The [`connect`] method will connect this socket to a remote address. This method
+ /// resolves to an error if the socket is not connected.
+ ///
+ /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
+ /// `Waker` from the `Context` passed to the most recent call will be scheduled to
+ /// receive a wakeup.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the socket is not ready to read
+ /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`connect`]: method@Self::connect
+ pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
+ let n = ready!(self.io.registration().poll_read_io(cx, || {
+ // Safety: will not read the maybe uninitialized bytes.
+ let b = unsafe {
+ &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
+ };
+
+ self.io.recv(b)
+ }))?;
+
+ // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
+ unsafe {
+ buf.assume_init(n);
+ }
+ buf.advance(n);
+ Poll::Ready(Ok(()))
+ }
+
+ /// Tries to receive data from the socket without waiting.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixDatagram;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let client_path = dir.path().join("client.sock");
+ /// let server_path = dir.path().join("server.sock");
+ /// let socket = UnixDatagram::bind(&client_path)?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// socket.readable().await?;
+ ///
+ /// // The buffer is **not** included in the async task and will
+ /// // only exist on the stack.
+ /// let mut buf = [0; 1024];
+ ///
+ /// // Try to recv data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match socket.try_recv_from(&mut buf) {
+ /// Ok((n, _addr)) => {
+ /// println!("GOT {:?}", &buf[..n]);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e);
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
+ let (n, addr) = self
+ .io
+ .registration()
+ .try_io(Interest::READABLE, || self.io.recv_from(buf))?;
+
+ Ok((n, SocketAddr(addr)))
+ }
+
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation from the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixDatagram` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixDatagram::readable()
+ /// [`writable()`]: UnixDatagram::writable()
+ /// [`ready()`]: UnixDatagram::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io.registration().try_io(interest, f)
+ }
+
+ /// Returns the local address that this socket is bound to.
+ ///
+ /// # Examples
+ /// For a socket bound to a local path
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // We use a temporary directory so that the socket
+ /// // files left by the bound sockets will get cleaned up.
+ /// let tmp = tempdir()?;
+ ///
+ /// // Bind socket to a filesystem path
+ /// let socket_path = tmp.path().join("socket");
+ /// let socket = UnixDatagram::bind(&socket_path)?;
+ ///
+ /// assert_eq!(socket.local_addr()?.as_pathname().unwrap(), &socket_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// For an unbound socket
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create an unbound socket
+ /// let socket = UnixDatagram::unbound()?;
+ ///
+ /// assert!(socket.local_addr()?.is_unnamed());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr().map(SocketAddr)
+ }
+
+ /// Returns the address of this socket's peer.
+ ///
+ /// The `connect` method will connect the socket to a peer.
+ ///
+ /// # Examples
+ /// For a peer with a local path
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use tempfile::tempdir;
+ ///
+ /// // Create an unbound socket
+ /// let tx = UnixDatagram::unbound()?;
+ ///
+ /// // Create another, bound socket
+ /// let tmp = tempdir()?;
+ /// let rx_path = tmp.path().join("rx");
+ /// let rx = UnixDatagram::bind(&rx_path)?;
+ ///
+ /// // Connect to the bound socket
+ /// tx.connect(&rx_path)?;
+ ///
+ /// assert_eq!(tx.peer_addr()?.as_pathname().unwrap(), &rx_path);
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// For an unbound peer
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create the pair of sockets
+ /// let (sock1, sock2) = UnixDatagram::pair()?;
+ ///
+ /// assert!(sock1.peer_addr()?.is_unnamed());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.peer_addr().map(SocketAddr)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ ///
+ /// // Create an unbound socket
+ /// let socket = UnixDatagram::unbound()?;
+ ///
+ /// if let Ok(Some(err)) = socket.take_error() {
+ /// println!("Got error: {:?}", err);
+ /// }
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
+ /// Shuts down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ ///
+ /// # Examples
+ /// ```
+ /// # use std::error::Error;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<(), Box<dyn Error>> {
+ /// use tokio::net::UnixDatagram;
+ /// use std::net::Shutdown;
+ ///
+ /// // Create an unbound socket
+ /// let (socket, other) = UnixDatagram::pair()?;
+ ///
+ /// socket.shutdown(Shutdown::Both)?;
+ ///
+ /// // NOTE: the following commented out code does NOT work as expected.
+ /// // Due to an underlying issue, the recv call will block indefinitely.
+ /// // See: https://github.com/tokio-rs/tokio/issues/1679
+ /// //let mut buff = vec![0u8; 24];
+ /// //let size = socket.recv(&mut buff).await?;
+ /// //assert_eq!(size, 0);
+ ///
+ /// let send_result = socket.send(b"hello world").await;
+ /// assert!(send_result.is_err());
+ ///
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.shutdown(how)
+ }
+}
+
+impl TryFrom<std::os::unix::net::UnixDatagram> for UnixDatagram {
+ type Error = io::Error;
+
+ /// Consumes stream, returning the Tokio I/O object.
+ ///
+ /// This is equivalent to
+ /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
+ fn try_from(stream: std::os::unix::net::UnixDatagram) -> Result<Self, Self::Error> {
+ Self::from_std(stream)
+ }
+}
+
+impl fmt::Debug for UnixDatagram {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.io.fmt(f)
+ }
+}
+
+impl AsRawFd for UnixDatagram {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/listener.rs b/third_party/rust/tokio/src/net/unix/listener.rs
new file mode 100644
index 0000000000..1785f8b0f7
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/listener.rs
@@ -0,0 +1,186 @@
+use crate::io::{Interest, PollEvented};
+use crate::net::unix::{SocketAddr, UnixStream};
+
+use std::convert::TryFrom;
+use std::fmt;
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::net;
+use std::path::Path;
+use std::task::{Context, Poll};
+
+cfg_net_unix! {
+ /// A Unix socket which can accept connections from other Unix sockets.
+ ///
+ /// You can accept a new connection by using the [`accept`](`UnixListener::accept`) method.
+ ///
+ /// A `UnixListener` can be turned into a `Stream` with [`UnixListenerStream`].
+ ///
+ /// [`UnixListenerStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnixListenerStream.html
+ ///
+ /// # Errors
+ ///
+ /// Note that accepting a connection can lead to various errors and not all
+ /// of them are necessarily fatal ‒ for example having too many open file
+ /// descriptors or the other side closing the connection while it waits in
+ /// an accept queue. These would terminate the stream if not handled in any
+ /// way.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixListener;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let listener = UnixListener::bind("/path/to/the/socket").unwrap();
+ /// loop {
+ /// match listener.accept().await {
+ /// Ok((stream, _addr)) => {
+ /// println!("new client!");
+ /// }
+ /// Err(e) => { /* connection failed */ }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub struct UnixListener {
+ io: PollEvented<mio::net::UnixListener>,
+ }
+}
+
+impl UnixListener {
+ /// Creates a new `UnixListener` bound to the specified path.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn bind<P>(path: P) -> io::Result<UnixListener>
+ where
+ P: AsRef<Path>,
+ {
+ let listener = mio::net::UnixListener::bind(path)?;
+ let io = PollEvented::new(listener)?;
+ Ok(UnixListener { io })
+ }
+
+ /// Creates new `UnixListener` from a `std::os::unix::net::UnixListener `.
+ ///
+ /// This function is intended to be used to wrap a UnixListener from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying listener; it is left up to the user to set
+ /// it in non-blocking mode.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
+ let listener = mio::net::UnixListener::from_std(listener);
+ let io = PollEvented::new(listener)?;
+ Ok(UnixListener { io })
+ }
+
+ /// Turns a [`tokio::net::UnixListener`] into a [`std::os::unix::net::UnixListener`].
+ ///
+ /// The returned [`std::os::unix::net::UnixListener`] will have nonblocking mode
+ /// set as `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
+ ///
+ /// # Examples
+ ///
+ /// ```rust,no_run
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let tokio_listener = tokio::net::UnixListener::bind("127.0.0.1:0")?;
+ /// let std_listener = tokio_listener.into_std()?;
+ /// std_listener.set_nonblocking(false)?;
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// [`tokio::net::UnixListener`]: UnixListener
+ /// [`std::os::unix::net::UnixListener`]: std::os::unix::net::UnixListener
+ /// [`set_nonblocking`]: fn@std::os::unix::net::UnixListener::set_nonblocking
+ pub fn into_std(self) -> io::Result<std::os::unix::net::UnixListener> {
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { net::UnixListener::from_raw_fd(raw_fd) })
+ }
+
+ /// Returns the local socket address of this listener.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr().map(SocketAddr)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
+ /// Accepts a new incoming connection to this listener.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. If the method is used as the event in a
+ /// [`tokio::select!`](crate::select) statement and some other branch
+ /// completes first, then it is guaranteed that no new connections were
+ /// accepted by this method.
+ pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
+ let (mio, addr) = self
+ .io
+ .registration()
+ .async_io(Interest::READABLE, || self.io.accept())
+ .await?;
+
+ let addr = SocketAddr(addr);
+ let stream = UnixStream::new(mio)?;
+ Ok((stream, addr))
+ }
+
+ /// Polls to accept a new incoming connection to this listener.
+ ///
+ /// If there is no connection to accept, `Poll::Pending` is returned and the
+ /// current task will be notified by a waker. Note that on multiple calls
+ /// to `poll_accept`, only the `Waker` from the `Context` passed to the most
+ /// recent call is scheduled to receive a wakeup.
+ pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
+ let (sock, addr) = ready!(self.io.registration().poll_read_io(cx, || self.io.accept()))?;
+ let addr = SocketAddr(addr);
+ let sock = UnixStream::new(sock)?;
+ Poll::Ready(Ok((sock, addr)))
+ }
+}
+
+impl TryFrom<std::os::unix::net::UnixListener> for UnixListener {
+ type Error = io::Error;
+
+ /// Consumes stream, returning the tokio I/O object.
+ ///
+ /// This is equivalent to
+ /// [`UnixListener::from_std(stream)`](UnixListener::from_std).
+ fn try_from(stream: std::os::unix::net::UnixListener) -> io::Result<Self> {
+ Self::from_std(stream)
+ }
+}
+
+impl fmt::Debug for UnixListener {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.io.fmt(f)
+ }
+}
+
+impl AsRawFd for UnixListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/mod.rs b/third_party/rust/tokio/src/net/unix/mod.rs
new file mode 100644
index 0000000000..14cb456705
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/mod.rs
@@ -0,0 +1,24 @@
+//! Unix domain socket utility types.
+
+// This module does not currently provide any public API, but it was
+// unintentionally defined as a public module. Hide it from the documentation
+// instead of changing it to a private module to avoid breakage.
+#[doc(hidden)]
+pub mod datagram;
+
+pub(crate) mod listener;
+
+mod split;
+pub use split::{ReadHalf, WriteHalf};
+
+mod split_owned;
+pub use split_owned::{OwnedReadHalf, OwnedWriteHalf, ReuniteError};
+
+mod socketaddr;
+pub use socketaddr::SocketAddr;
+
+pub(crate) mod stream;
+pub(crate) use stream::UnixStream;
+
+mod ucred;
+pub use ucred::UCred;
diff --git a/third_party/rust/tokio/src/net/unix/socketaddr.rs b/third_party/rust/tokio/src/net/unix/socketaddr.rs
new file mode 100644
index 0000000000..48f7b96b8c
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/socketaddr.rs
@@ -0,0 +1,31 @@
+use std::fmt;
+use std::path::Path;
+
+/// An address associated with a Tokio Unix socket.
+pub struct SocketAddr(pub(super) mio::net::SocketAddr);
+
+impl SocketAddr {
+ /// Returns `true` if the address is unnamed.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn is_unnamed(&self) -> bool {
+ self.0.is_unnamed()
+ }
+
+ /// Returns the contents of this address if it is a `pathname` address.
+ ///
+ /// Documentation reflected in [`SocketAddr`]
+ ///
+ /// [`SocketAddr`]: std::os::unix::net::SocketAddr
+ pub fn as_pathname(&self) -> Option<&Path> {
+ self.0.as_pathname()
+ }
+}
+
+impl fmt::Debug for SocketAddr {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(fmt)
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/split.rs b/third_party/rust/tokio/src/net/unix/split.rs
new file mode 100644
index 0000000000..d4686c22d7
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/split.rs
@@ -0,0 +1,305 @@
+//! `UnixStream` split support.
+//!
+//! A `UnixStream` can be split into a read half and a write half with
+//! `UnixStream::split`. The read half implements `AsyncRead` while the write
+//! half implements `AsyncWrite`.
+//!
+//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
+//! split has no associated overhead and enforces all invariants at the type
+//! level.
+
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
+use crate::net::UnixStream;
+
+use crate::net::unix::SocketAddr;
+use std::io;
+use std::net::Shutdown;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
+/// Borrowed read half of a [`UnixStream`], created by [`split`].
+///
+/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
+/// [`AsyncReadExt`] trait.
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+#[derive(Debug)]
+pub struct ReadHalf<'a>(&'a UnixStream);
+
+/// Borrowed write half of a [`UnixStream`], created by [`split`].
+///
+/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will
+/// shut down the UnixStream stream in the write direction.
+///
+/// Writing to an `WriteHalf` is usually done using the convenience methods found
+/// on the [`AsyncWriteExt`] trait.
+///
+/// [`UnixStream`]: UnixStream
+/// [`split`]: UnixStream::split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+#[derive(Debug)]
+pub struct WriteHalf<'a>(&'a UnixStream);
+
+pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
+ (ReadHalf(stream), WriteHalf(stream))
+}
+
+impl ReadHalf<'_> {
+ /// Wait for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.0.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.0.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl WriteHalf<'_> {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.0.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.0.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl AsyncRead for ReadHalf<'_> {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.0.poll_read_priv(cx, buf)
+ }
+}
+
+impl AsyncWrite for WriteHalf<'_> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_priv(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.0.poll_write_vectored_priv(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.0.is_write_vectored()
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.0.shutdown_std(Shutdown::Write).into()
+ }
+}
+
+impl AsRef<UnixStream> for ReadHalf<'_> {
+ fn as_ref(&self) -> &UnixStream {
+ self.0
+ }
+}
+
+impl AsRef<UnixStream> for WriteHalf<'_> {
+ fn as_ref(&self) -> &UnixStream {
+ self.0
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/split_owned.rs b/third_party/rust/tokio/src/net/unix/split_owned.rs
new file mode 100644
index 0000000000..9c3a2a4177
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/split_owned.rs
@@ -0,0 +1,393 @@
+//! `UnixStream` owned split support.
+//!
+//! A `UnixStream` can be split into an `OwnedReadHalf` and a `OwnedWriteHalf`
+//! with the `UnixStream::into_split` method. `OwnedReadHalf` implements
+//! `AsyncRead` while `OwnedWriteHalf` implements `AsyncWrite`.
+//!
+//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized
+//! split has no associated overhead and enforces all invariants at the type
+//! level.
+
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
+use crate::net::UnixStream;
+
+use crate::net::unix::SocketAddr;
+use std::error::Error;
+use std::net::Shutdown;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, io};
+
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
+/// Owned read half of a [`UnixStream`], created by [`into_split`].
+///
+/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
+/// on the [`AsyncReadExt`] trait.
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
+#[derive(Debug)]
+pub struct OwnedReadHalf {
+ inner: Arc<UnixStream>,
+}
+
+/// Owned write half of a [`UnixStream`], created by [`into_split`].
+///
+/// Note that in the [`AsyncWrite`] implementation of this type,
+/// [`poll_shutdown`] will shut down the stream in the write direction.
+/// Dropping the write half will also shut down the write half of the stream.
+///
+/// Writing to an `OwnedWriteHalf` is usually done using the convenience methods
+/// found on the [`AsyncWriteExt`] trait.
+///
+/// [`UnixStream`]: crate::net::UnixStream
+/// [`into_split`]: crate::net::UnixStream::into_split()
+/// [`AsyncWrite`]: trait@crate::io::AsyncWrite
+/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown
+/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+#[derive(Debug)]
+pub struct OwnedWriteHalf {
+ inner: Arc<UnixStream>,
+ shutdown_on_drop: bool,
+}
+
+pub(crate) fn split_owned(stream: UnixStream) -> (OwnedReadHalf, OwnedWriteHalf) {
+ let arc = Arc::new(stream);
+ let read = OwnedReadHalf {
+ inner: Arc::clone(&arc),
+ };
+ let write = OwnedWriteHalf {
+ inner: arc,
+ shutdown_on_drop: true,
+ };
+ (read, write)
+}
+
+pub(crate) fn reunite(
+ read: OwnedReadHalf,
+ write: OwnedWriteHalf,
+) -> Result<UnixStream, ReuniteError> {
+ if Arc::ptr_eq(&read.inner, &write.inner) {
+ write.forget();
+ // This unwrap cannot fail as the api does not allow creating more than two Arcs,
+ // and we just dropped the other half.
+ Ok(Arc::try_unwrap(read.inner).expect("UnixStream: try_unwrap failed in reunite"))
+ } else {
+ Err(ReuniteError(read, write))
+ }
+}
+
+/// Error indicating that two halves were not from the same socket, and thus could
+/// not be reunited.
+#[derive(Debug)]
+pub struct ReuniteError(pub OwnedReadHalf, pub OwnedWriteHalf);
+
+impl fmt::Display for ReuniteError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(
+ f,
+ "tried to reunite halves that are not from the same socket"
+ )
+ }
+}
+
+impl Error for ReuniteError {}
+
+impl OwnedReadHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedWriteHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(self, other)
+ }
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.inner.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.try_read(buf)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.inner.try_read_buf(buf)
+ }
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.try_read_vectored(bufs)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
+}
+
+impl AsyncRead for OwnedReadHalf {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.inner.poll_read_priv(cx, buf)
+ }
+}
+
+impl OwnedWriteHalf {
+ /// Attempts to put the two halves of a `UnixStream` back together and
+ /// recover the original socket. Succeeds only if the two halves
+ /// originated from the same call to [`into_split`].
+ ///
+ /// [`into_split`]: crate::net::UnixStream::into_split()
+ pub fn reunite(self, other: OwnedReadHalf) -> Result<UnixStream, ReuniteError> {
+ reunite(other, self)
+ }
+
+ /// Destroys the write half, but don't close the write half of the stream
+ /// until the read half is dropped. If the read half has already been
+ /// dropped, this closes the stream.
+ pub fn forget(mut self) {
+ self.shutdown_on_drop = false;
+ drop(self);
+ }
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.inner.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.try_write_vectored(buf)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
+}
+
+impl Drop for OwnedWriteHalf {
+ fn drop(&mut self) {
+ if self.shutdown_on_drop {
+ let _ = self.inner.shutdown_std(Shutdown::Write);
+ }
+ }
+}
+
+impl AsyncWrite for OwnedWriteHalf {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write_priv(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write_vectored_priv(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
+
+ #[inline]
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ // flush is a no-op
+ Poll::Ready(Ok(()))
+ }
+
+ // `poll_shutdown` on a write half shutdowns the stream in the "write" direction.
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ let res = self.inner.shutdown_std(Shutdown::Write);
+ if res.is_ok() {
+ Pin::into_inner(self).shutdown_on_drop = false;
+ }
+ res.into()
+ }
+}
+
+impl AsRef<UnixStream> for OwnedReadHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &*self.inner
+ }
+}
+
+impl AsRef<UnixStream> for OwnedWriteHalf {
+ fn as_ref(&self) -> &UnixStream {
+ &*self.inner
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/stream.rs b/third_party/rust/tokio/src/net/unix/stream.rs
new file mode 100644
index 0000000000..4e7ef87b41
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/stream.rs
@@ -0,0 +1,960 @@
+use crate::future::poll_fn;
+use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
+use crate::net::unix::split::{split, ReadHalf, WriteHalf};
+use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
+use crate::net::unix::ucred::{self, UCred};
+use crate::net::unix::SocketAddr;
+
+use std::convert::TryFrom;
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::os::unix::net;
+use std::path::Path;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
+cfg_net_unix! {
+ /// A structure representing a connected Unix socket.
+ ///
+ /// This socket can be connected directly with `UnixStream::connect` or accepted
+ /// from a listener with `UnixListener::incoming`. Additionally, a pair of
+ /// anonymous Unix sockets can be created with `UnixStream::pair`.
+ ///
+ /// To shut down the stream in the write direction, you can call the
+ /// [`shutdown()`] method. This will cause the other peer to receive a read of
+ /// length 0, indicating that no more data will be sent. This only closes
+ /// the stream in one direction.
+ ///
+ /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
+ pub struct UnixStream {
+ io: PollEvented<mio::net::UnixStream>,
+ }
+}
+
+impl UnixStream {
+ /// Connects to the socket named by `path`.
+ ///
+ /// This function will create a new Unix socket and connect to the path
+ /// specified, associating the returned stream with the default event loop's
+ /// handle.
+ pub async fn connect<P>(path: P) -> io::Result<UnixStream>
+ where
+ P: AsRef<Path>,
+ {
+ let stream = mio::net::UnixStream::connect(path)?;
+ let stream = UnixStream::new(stream)?;
+
+ poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
+
+ if let Some(e) = stream.io.take_error()? {
+ return Err(e);
+ }
+
+ Ok(stream)
+ }
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with `try_read()` or `try_write()`. It
+ /// can be used to concurrently read / write to the same socket on a single
+ /// task without splitting the socket.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// Concurrently read and write to the stream on the same task without
+ /// splitting.
+ ///
+ /// ```no_run
+ /// use tokio::io::Interest;
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
+ ///
+ /// if ready.is_readable() {
+ /// let mut data = vec![0; 1024];
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut data) {
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ ///
+ /// }
+ ///
+ /// if ready.is_writable() {
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// println!("write {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ /// }
+ /// }
+ /// ```
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ let event = self.io.registration().readiness(interest).await?;
+ Ok(event.ready)
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// let mut msg = vec![0; 1024];
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut msg) {
+ /// Ok(n) => {
+ /// msg.truncate(n);
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// println!("GOT = {:?}", msg);
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn readable(&self) -> io::Result<()> {
+ self.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for read readiness.
+ ///
+ /// If the unix stream is not currently ready for reading, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the unix
+ /// stream becomes ready for reading, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`readable`] is not feasible. Where possible, using [`readable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the unix stream is not ready for reading.
+ /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`readable`]: method@Self::readable
+ pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_read_ready(cx).map_ok(|_| ())
+ }
+
+ /// Try to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixStream::readable()
+ /// [`ready()`]: UnixStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf = [0; 4096];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read(buf))
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: UnixStream::try_read()
+ /// [`readable()`]: UnixStream::readable()
+ /// [`ready()`]: UnixStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io::{self, IoSliceMut};
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// // Creating the buffer **after** the `await` prevents it from
+ /// // being stored in the async task.
+ /// let mut buf_a = [0; 512];
+ /// let mut buf_b = [0; 1024];
+ /// let mut bufs = [
+ /// IoSliceMut::new(&mut buf_a),
+ /// IoSliceMut::new(&mut buf_b),
+ /// ];
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_vectored(&mut bufs) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixStream::readable()
+ /// [`ready()`]: UnixStream::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be readable
+ /// stream.readable().await?;
+ ///
+ /// let mut buf = Vec::with_capacity(4096);
+ ///
+ /// // Try to read data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_read_buf(&mut buf) {
+ /// Ok(0) => break,
+ /// Ok(n) => {
+ /// println!("read {} bytes", n);
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.io.registration().try_io(Interest::READABLE, || {
+ use std::io::Read;
+
+ let dst = buf.chunk_mut();
+ let dst =
+ unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
+
+ // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
+ // buffer.
+ let n = (&*self.io).read(dst)?;
+
+ unsafe {
+ buf.advance_mut(n);
+ }
+
+ Ok(n)
+ })
+ }
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn writable(&self) -> io::Result<()> {
+ self.ready(Interest::WRITABLE).await?;
+ Ok(())
+ }
+
+ /// Polls for write readiness.
+ ///
+ /// If the unix stream is not currently ready for writing, this method will
+ /// store a clone of the `Waker` from the provided `Context`. When the unix
+ /// stream becomes ready for writing, `Waker::wake` will be called on the
+ /// waker.
+ ///
+ /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
+ /// the `Waker` from the `Context` passed to the most recent call is
+ /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
+ /// second, independent waker.)
+ ///
+ /// This function is intended for cases where creating and pinning a future
+ /// via [`writable`] is not feasible. Where possible, using [`writable`] is
+ /// preferred, as this supports polling from multiple tasks at once.
+ ///
+ /// # Return value
+ ///
+ /// The function returns:
+ ///
+ /// * `Poll::Pending` if the unix stream is not ready for writing.
+ /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
+ /// * `Poll::Ready(Err(e))` if an error is encountered.
+ ///
+ /// # Errors
+ ///
+ /// This function may encounter any standard I/O error except `WouldBlock`.
+ ///
+ /// [`writable`]: method@Self::writable
+ pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.io.registration().poll_write_ready(cx).map_ok(|_| ())
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write(b"hello world") {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: UnixStream::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ /// use std::error::Error;
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
+ ///
+ /// loop {
+ /// // Wait for the socket to be writable
+ /// stream.writable().await?;
+ ///
+ /// // Try to write data, this may still fail with `WouldBlock`
+ /// // if the readiness event is a false positive.
+ /// match stream.try_write_vectored(&bufs) {
+ /// Ok(n) => {
+ /// break;
+ /// }
+ /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ /// continue;
+ /// }
+ /// Err(e) => {
+ /// return Err(e.into());
+ /// }
+ /// }
+ /// }
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.io
+ .registration()
+ .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
+ }
+
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation from the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `UnixStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: UnixStream::readable()
+ /// [`writable()`]: UnixStream::writable()
+ /// [`ready()`]: UnixStream::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io.registration().try_io(interest, f)
+ }
+
+ /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
+ ///
+ /// This function is intended to be used to wrap a UnixStream from the
+ /// standard library in the Tokio equivalent. The conversion assumes
+ /// nothing about the underlying stream; it is left up to the user to set
+ /// it in non-blocking mode.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if thread-local runtime is not set.
+ ///
+ /// The runtime is usually set implicitly when this function is called
+ /// from a future driven by a tokio runtime, otherwise runtime can be set
+ /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
+ let stream = mio::net::UnixStream::from_std(stream);
+ let io = PollEvented::new(stream)?;
+
+ Ok(UnixStream { io })
+ }
+
+ /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
+ ///
+ /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
+ /// mode set as `true`. Use [`set_nonblocking`] to change the blocking
+ /// mode if needed.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::error::Error;
+ /// use std::io::Read;
+ /// use tokio::net::UnixListener;
+ /// # use tokio::net::UnixStream;
+ /// # use tokio::io::AsyncWriteExt;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ ///
+ /// let mut data = [0u8; 12];
+ /// let listener = UnixListener::bind(&bind_path)?;
+ /// # let handle = tokio::spawn(async {
+ /// # let mut stream = UnixStream::connect(bind_path).await.unwrap();
+ /// # stream.write(b"Hello world!").await.unwrap();
+ /// # });
+ /// let (tokio_unix_stream, _) = listener.accept().await?;
+ /// let mut std_unix_stream = tokio_unix_stream.into_std()?;
+ /// # handle.await.expect("The task being joined has panicked");
+ /// std_unix_stream.set_nonblocking(false)?;
+ /// std_unix_stream.read_exact(&mut data)?;
+ /// # assert_eq!(b"Hello world!", &data);
+ /// Ok(())
+ /// }
+ /// ```
+ /// [`tokio::net::UnixStream`]: UnixStream
+ /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
+ /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
+ pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another. Each socket will
+ /// be associated with the default event loop's handle.
+ pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
+ let (a, b) = mio::net::UnixStream::pair()?;
+ let a = UnixStream::new(a)?;
+ let b = UnixStream::new(b)?;
+
+ Ok((a, b))
+ }
+
+ pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
+ let io = PollEvented::new(stream)?;
+ Ok(UnixStream { io })
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.local_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.local_addr().map(SocketAddr)
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::UnixStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let dir = tempfile::tempdir().unwrap();
+ /// let bind_path = dir.path().join("bind_path");
+ /// let stream = UnixStream::connect(bind_path).await?;
+ ///
+ /// println!("{:?}", stream.peer_addr()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.peer_addr().map(SocketAddr)
+ }
+
+ /// Returns effective credentials of the process which called `connect` or `pair`.
+ pub fn peer_cred(&self) -> io::Result<UCred> {
+ ucred::get_peer_cred(self)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
+ /// Shuts down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
+ self.io.shutdown(how)
+ }
+
+ // These lifetime markers also appear in the generated documentation, and make
+ // it more clear that this is a *borrowed* split.
+ #[allow(clippy::needless_lifetimes)]
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// This method is more efficient than [`into_split`], but the halves cannot be
+ /// moved into independently spawned tasks.
+ ///
+ /// [`into_split`]: Self::into_split()
+ pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
+ split(self)
+ }
+
+ /// Splits a `UnixStream` into a read half and a write half, which can be used
+ /// to read and write the stream concurrently.
+ ///
+ /// Unlike [`split`], the owned halves can be moved to separate tasks, however
+ /// this comes at the cost of a heap allocation.
+ ///
+ /// **Note:** Dropping the write half will shut down the write half of the
+ /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
+ ///
+ /// [`split`]: Self::split()
+ /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
+ pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
+ split_owned(self)
+ }
+}
+
+impl TryFrom<net::UnixStream> for UnixStream {
+ type Error = io::Error;
+
+ /// Consumes stream, returning the tokio I/O object.
+ ///
+ /// This is equivalent to
+ /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
+ fn try_from(stream: net::UnixStream) -> io::Result<Self> {
+ Self::from_std(stream)
+ }
+}
+
+impl AsyncRead for UnixStream {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.poll_read_priv(cx, buf)
+ }
+}
+
+impl AsyncWrite for UnixStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_write_priv(cx, buf)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.poll_write_vectored_priv(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ true
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
+ self.shutdown_std(std::net::Shutdown::Write)?;
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl UnixStream {
+ // == Poll IO functions that takes `&self` ==
+ //
+ // To read or write without mutable access to the `UnixStream`, combine the
+ // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
+ // `try_write` methods.
+
+ pub(crate) fn poll_read_priv(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
+ unsafe { self.io.poll_read(cx, buf) }
+ }
+
+ pub(crate) fn poll_write_priv(
+ &self,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.io.poll_write(cx, buf)
+ }
+
+ pub(super) fn poll_write_vectored_priv(
+ &self,
+ cx: &mut Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ self.io.poll_write_vectored(cx, bufs)
+ }
+}
+
+impl fmt::Debug for UnixStream {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.io.fmt(f)
+ }
+}
+
+impl AsRawFd for UnixStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio/src/net/unix/ucred.rs b/third_party/rust/tokio/src/net/unix/ucred.rs
new file mode 100644
index 0000000000..865303b4ce
--- /dev/null
+++ b/third_party/rust/tokio/src/net/unix/ucred.rs
@@ -0,0 +1,252 @@
+use libc::{gid_t, pid_t, uid_t};
+
+/// Credentials of a process.
+#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
+pub struct UCred {
+ /// PID (process ID) of the process.
+ pid: Option<pid_t>,
+ /// UID (user ID) of the process.
+ uid: uid_t,
+ /// GID (group ID) of the process.
+ gid: gid_t,
+}
+
+impl UCred {
+ /// Gets UID (user ID) of the process.
+ pub fn uid(&self) -> uid_t {
+ self.uid
+ }
+
+ /// Gets GID (group ID) of the process.
+ pub fn gid(&self) -> gid_t {
+ self.gid
+ }
+
+ /// Gets PID (process ID) of the process.
+ ///
+ /// This is only implemented under Linux, Android, iOS, macOS, Solaris and
+ /// Illumos. On other platforms this will always return `None`.
+ pub fn pid(&self) -> Option<pid_t> {
+ self.pid
+ }
+}
+
+#[cfg(any(target_os = "linux", target_os = "android", target_os = "openbsd"))]
+pub(crate) use self::impl_linux::get_peer_cred;
+
+#[cfg(any(target_os = "netbsd"))]
+pub(crate) use self::impl_netbsd::get_peer_cred;
+
+#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
+pub(crate) use self::impl_bsd::get_peer_cred;
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+pub(crate) use self::impl_macos::get_peer_cred;
+
+#[cfg(any(target_os = "solaris", target_os = "illumos"))]
+pub(crate) use self::impl_solaris::get_peer_cred;
+
+#[cfg(any(target_os = "linux", target_os = "android", target_os = "openbsd"))]
+pub(crate) mod impl_linux {
+ use crate::net::unix::UnixStream;
+
+ use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED};
+ use std::{io, mem};
+
+ #[cfg(target_os = "openbsd")]
+ use libc::sockpeercred as ucred;
+ #[cfg(any(target_os = "linux", target_os = "android"))]
+ use libc::ucred;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ use std::os::unix::io::AsRawFd;
+
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut ucred = ucred {
+ pid: 0,
+ uid: 0,
+ gid: 0,
+ };
+
+ let ucred_size = mem::size_of::<ucred>();
+
+ // These paranoid checks should be optimized-out
+ assert!(mem::size_of::<u32>() <= mem::size_of::<usize>());
+ assert!(ucred_size <= u32::MAX as usize);
+
+ let mut ucred_size = ucred_size as socklen_t;
+
+ let ret = getsockopt(
+ raw_fd,
+ SOL_SOCKET,
+ SO_PEERCRED,
+ &mut ucred as *mut ucred as *mut c_void,
+ &mut ucred_size,
+ );
+ if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() {
+ Ok(super::UCred {
+ uid: ucred.uid,
+ gid: ucred.gid,
+ pid: Some(ucred.pid),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "netbsd"))]
+pub(crate) mod impl_netbsd {
+ use crate::net::unix::UnixStream;
+
+ use libc::{c_void, getsockopt, socklen_t, unpcbid, LOCAL_PEEREID, SOL_SOCKET};
+ use std::io;
+ use std::mem::size_of;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut unpcbid = unpcbid {
+ unp_pid: 0,
+ unp_euid: 0,
+ unp_egid: 0,
+ };
+
+ let unpcbid_size = size_of::<unpcbid>();
+ let mut unpcbid_size = unpcbid_size as socklen_t;
+
+ let ret = getsockopt(
+ raw_fd,
+ SOL_SOCKET,
+ LOCAL_PEEREID,
+ &mut unpcbid as *mut unpcbid as *mut c_void,
+ &mut unpcbid_size,
+ );
+ if ret == 0 && unpcbid_size as usize == size_of::<unpcbid>() {
+ Ok(super::UCred {
+ uid: unpcbid.unp_euid,
+ gid: unpcbid.unp_egid,
+ pid: Some(unpcbid.unp_pid),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
+pub(crate) mod impl_bsd {
+ use crate::net::unix::UnixStream;
+
+ use libc::getpeereid;
+ use std::io;
+ use std::mem::MaybeUninit;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut uid = MaybeUninit::uninit();
+ let mut gid = MaybeUninit::uninit();
+
+ let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr());
+
+ if ret == 0 {
+ Ok(super::UCred {
+ uid: uid.assume_init(),
+ gid: gid.assume_init(),
+ pid: None,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+pub(crate) mod impl_macos {
+ use crate::net::unix::UnixStream;
+
+ use libc::{c_void, getpeereid, getsockopt, pid_t, LOCAL_PEEREPID, SOL_LOCAL};
+ use std::io;
+ use std::mem::size_of;
+ use std::mem::MaybeUninit;
+ use std::os::unix::io::AsRawFd;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut uid = MaybeUninit::uninit();
+ let mut gid = MaybeUninit::uninit();
+ let mut pid: MaybeUninit<pid_t> = MaybeUninit::uninit();
+ let mut pid_size: MaybeUninit<u32> = MaybeUninit::new(size_of::<pid_t>() as u32);
+
+ if getsockopt(
+ raw_fd,
+ SOL_LOCAL,
+ LOCAL_PEEREPID,
+ pid.as_mut_ptr() as *mut c_void,
+ pid_size.as_mut_ptr(),
+ ) != 0
+ {
+ return Err(io::Error::last_os_error());
+ }
+
+ assert!(pid_size.assume_init() == (size_of::<pid_t>() as u32));
+
+ let ret = getpeereid(raw_fd, uid.as_mut_ptr(), gid.as_mut_ptr());
+
+ if ret == 0 {
+ Ok(super::UCred {
+ uid: uid.assume_init(),
+ gid: gid.assume_init(),
+ pid: Some(pid.assume_init()),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "solaris", target_os = "illumos"))]
+pub(crate) mod impl_solaris {
+ use crate::net::unix::UnixStream;
+ use std::io;
+ use std::os::unix::io::AsRawFd;
+ use std::ptr;
+
+ pub(crate) fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut cred = ptr::null_mut();
+ let ret = libc::getpeerucred(raw_fd, &mut cred);
+
+ if ret == 0 {
+ let uid = libc::ucred_geteuid(cred);
+ let gid = libc::ucred_getegid(cred);
+ let pid = libc::ucred_getpid(cred);
+
+ libc::ucred_free(cred);
+
+ Ok(super::UCred {
+ uid,
+ gid,
+ pid: Some(pid),
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}