diff options
Diffstat (limited to 'third_party/rust/tokio/src/net/unix/split.rs')
-rw-r--r-- | third_party/rust/tokio/src/net/unix/split.rs | 305 |
1 files changed, 305 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/net/unix/split.rs b/third_party/rust/tokio/src/net/unix/split.rs new file mode 100644 index 0000000000..d4686c22d7 --- /dev/null +++ b/third_party/rust/tokio/src/net/unix/split.rs @@ -0,0 +1,305 @@ +//! `UnixStream` split support. +//! +//! A `UnixStream` can be split into a read half and a write half with +//! `UnixStream::split`. The read half implements `AsyncRead` while the write +//! half implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready}; +use crate::net::UnixStream; + +use crate::net::unix::SocketAddr; +use std::io; +use std::net::Shutdown; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + use bytes::BufMut; +} + +/// Borrowed read half of a [`UnixStream`], created by [`split`]. +/// +/// Reading from a `ReadHalf` is usually done using the convenience methods found on the +/// [`AsyncReadExt`] trait. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt +#[derive(Debug)] +pub struct ReadHalf<'a>(&'a UnixStream); + +/// Borrowed write half of a [`UnixStream`], created by [`split`]. +/// +/// Note that in the [`AsyncWrite`] implementation of this type, [`poll_shutdown`] will +/// shut down the UnixStream stream in the write direction. +/// +/// Writing to an `WriteHalf` is usually done using the convenience methods found +/// on the [`AsyncWriteExt`] trait. +/// +/// [`UnixStream`]: UnixStream +/// [`split`]: UnixStream::split() +/// [`AsyncWrite`]: trait@crate::io::AsyncWrite +/// [`poll_shutdown`]: fn@crate::io::AsyncWrite::poll_shutdown +/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt +#[derive(Debug)] +pub struct WriteHalf<'a>(&'a UnixStream); + +pub(crate) fn split(stream: &mut UnixStream) -> (ReadHalf<'_>, WriteHalf<'_>) { + (ReadHalf(stream), WriteHalf(stream)) +} + +impl ReadHalf<'_> { + /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become readable. + /// + /// This function is equivalent to `ready(Interest::READABLE)` and is usually + /// paired with `try_read()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn readable(&self) -> io::Result<()> { + self.0.readable().await + } + + /// Tries to read data from the stream into the provided buffer, returning how + /// many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { + self.0.try_read(buf) + } + + cfg_io_util! { + /// Tries to read data from the stream into the provided buffer, advancing the + /// buffer's internal cursor, returning how many bytes were read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { + self.0.try_read_buf(buf) + } + } + + /// Tries to read data from the stream into the provided buffers, returning + /// how many bytes were read. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method behaves + /// equivalently to a single call to [`try_read()`] with concatenated + /// buffers. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read_vectored()` is non-blocking, the buffer does not have to be + /// stored by the async task and can exist entirely on the stack. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// [`try_read()`]: Self::try_read() + /// [`readable()`]: Self::readable() + /// [`ready()`]: Self::ready() + /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(0)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { + self.0.try_read_vectored(bufs) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + +impl WriteHalf<'_> { + /// Waits for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to read or write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { + self.0.ready(interest).await + } + + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + pub async fn writable(&self) -> io::Result<()> { + self.0.writable().await + } + + /// Tries to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is usually paired with `writable()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { + self.0.try_write(buf) + } + + /// Tries to write several buffers to the stream, returning how many bytes + /// were written. + /// + /// Data is written from each buffer in order, with the final buffer read + /// from possible being only partially consumed. This method behaves + /// equivalently to a single call to [`try_write()`] with concatenated + /// buffers. + /// + /// This function is usually paired with `writable()`. + /// + /// [`try_write()`]: Self::try_write() + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> { + self.0.try_write_vectored(buf) + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.0.peer_addr() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.0.local_addr() + } +} + +impl AsyncRead for ReadHalf<'_> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + self.0.poll_read_priv(cx, buf) + } +} + +impl AsyncWrite for WriteHalf<'_> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_write_priv(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll<io::Result<usize>> { + self.0.poll_write_vectored_priv(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.0.shutdown_std(Shutdown::Write).into() + } +} + +impl AsRef<UnixStream> for ReadHalf<'_> { + fn as_ref(&self) -> &UnixStream { + self.0 + } +} + +impl AsRef<UnixStream> for WriteHalf<'_> { + fn as_ref(&self) -> &UnixStream { + self.0 + } +} |