summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/net/tcp/stream.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/net/tcp/stream.rs
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/net/tcp/stream.rs')
-rw-r--r--vendor/tokio/src/net/tcp/stream.rs440
1 files changed, 286 insertions, 154 deletions
diff --git a/vendor/tokio/src/net/tcp/stream.rs b/vendor/tokio/src/net/tcp/stream.rs
index 0277a360d..b7104d78b 100644
--- a/vendor/tokio/src/net/tcp/stream.rs
+++ b/vendor/tokio/src/net/tcp/stream.rs
@@ -1,16 +1,18 @@
-use crate::future::poll_fn;
+cfg_not_wasi! {
+ use crate::future::poll_fn;
+ use crate::net::{to_socket_addrs, ToSocketAddrs};
+ use std::time::Duration;
+}
+
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
-use crate::net::{to_socket_addrs, ToSocketAddrs};
-use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::time::Duration;
cfg_io_util! {
use bytes::BufMut;
@@ -70,86 +72,88 @@ cfg_net! {
}
impl TcpStream {
- /// Opens a TCP connection to a remote host.
- ///
- /// `addr` is an address of the remote host. Anything which implements the
- /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
- /// yields multiple addresses, connect will be attempted with each of the
- /// addresses until a connection is successful. If none of the addresses
- /// result in a successful connection, the error returned from the last
- /// connection attempt (the last address) is returned.
- ///
- /// To configure the socket before connecting, you can use the [`TcpSocket`]
- /// type.
- ///
- /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
- /// [`TcpSocket`]: struct@crate::net::TcpSocket
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- /// use tokio::io::AsyncWriteExt;
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// // Connect to a peer
- /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// // Write some data.
- /// stream.write_all(b"hello world!").await?;
- ///
- /// Ok(())
- /// }
- /// ```
- ///
- /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
- ///
- /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
- /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
- let addrs = to_socket_addrs(addr).await?;
+ cfg_not_wasi! {
+ /// Opens a TCP connection to a remote host.
+ ///
+ /// `addr` is an address of the remote host. Anything which implements the
+ /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
+ /// yields multiple addresses, connect will be attempted with each of the
+ /// addresses until a connection is successful. If none of the addresses
+ /// result in a successful connection, the error returned from the last
+ /// connection attempt (the last address) is returned.
+ ///
+ /// To configure the socket before connecting, you can use the [`TcpSocket`]
+ /// type.
+ ///
+ /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use tokio::io::AsyncWriteExt;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// // Write some data.
+ /// stream.write_all(b"hello world!").await?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
+ let addrs = to_socket_addrs(addr).await?;
- let mut last_err = None;
+ let mut last_err = None;
- for addr in addrs {
- match TcpStream::connect_addr(addr).await {
- Ok(stream) => return Ok(stream),
- Err(e) => last_err = Some(e),
+ for addr in addrs {
+ match TcpStream::connect_addr(addr).await {
+ Ok(stream) => return Ok(stream),
+ Err(e) => last_err = Some(e),
+ }
}
+
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
}
- Err(last_err.unwrap_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- "could not resolve to any address",
- )
- }))
- }
+ /// Establishes a connection to the specified `addr`.
+ async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
+ let sys = mio::net::TcpStream::connect(addr)?;
+ TcpStream::connect_mio(sys).await
+ }
- /// Establishes a connection to the specified `addr`.
- async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
- let sys = mio::net::TcpStream::connect(addr)?;
- TcpStream::connect_mio(sys).await
- }
+ pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
+ let stream = TcpStream::new(sys)?;
- pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
- let stream = TcpStream::new(sys)?;
+ // Once we've connected, wait for the stream to be writable as
+ // that's when the actual connection has been initiated. Once we're
+ // writable we check for `take_socket_error` to see if the connect
+ // actually hit an error or not.
+ //
+ // If all that succeeded then we ship everything on up.
+ poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
- // Once we've connected, wait for the stream to be writable as
- // that's when the actual connection has been initiated. Once we're
- // writable we check for `take_socket_error` to see if the connect
- // actually hit an error or not.
- //
- // If all that succeeded then we ship everything on up.
- poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
+ if let Some(e) = stream.io.take_error()? {
+ return Err(e);
+ }
- if let Some(e) = stream.io.take_error()? {
- return Err(e);
+ Ok(stream)
}
-
- Ok(stream)
}
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
@@ -160,9 +164,16 @@ impl TcpStream {
/// Creates new `TcpStream` from a `std::net::TcpStream`.
///
/// This function is intended to be used to wrap a TCP stream from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying stream; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
///
/// # Examples
///
@@ -181,18 +192,20 @@ impl TcpStream {
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_std(stream);
let io = PollEvented::new(io)?;
Ok(TcpStream { io })
}
- /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
+ /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
///
/// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
/// Use [`set_nonblocking`] to change the blocking mode if needed.
@@ -244,6 +257,15 @@ impl TcpStream {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
}
+
+ #[cfg(tokio_wasi)]
+ {
+ use std::os::wasi::io::{FromRawFd, IntoRawFd};
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
+ }
}
/// Returns the local address that this stream is bound to.
@@ -264,6 +286,11 @@ impl TcpStream {
self.io.local_addr()
}
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
/// Returns the remote address that this stream is connected to.
///
/// # Examples
@@ -350,12 +377,18 @@ impl TcpStream {
}
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
@@ -387,7 +420,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
- /// println!("read {} bytes", n);
+ /// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
@@ -422,7 +455,7 @@ impl TcpStream {
Ok(event.ready)
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -510,7 +543,7 @@ impl TcpStream {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
- /// Try to read data from the stream into the provided buffer, returning how
+ /// Tries to read data from the stream into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -526,8 +559,12 @@ impl TcpStream {
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
- /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
- /// and will no longer yield data. If the stream is not ready to read data
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
@@ -577,7 +614,7 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the stream into the provided buffers, returning
+ /// Tries to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -656,7 +693,7 @@ impl TcpStream {
}
cfg_io_util! {
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -734,7 +771,7 @@ impl TcpStream {
}
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -874,7 +911,7 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the stream, returning how many bytes
+ /// Tries to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -936,6 +973,84 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation on the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `TcpStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`writable()`]: TcpStream::writable()
+ /// [`ready()`]: TcpStream::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .try_io(interest, || self.io.try_io(f))
+ }
+
+ /// Reads or writes from the socket using a user-provided IO operation.
+ ///
+ /// The readiness of the socket is awaited and when the socket is ready,
+ /// the provided closure is called. The closure should attempt to perform
+ /// IO operation on the socket by manually calling the appropriate syscall.
+ /// If the operation fails because the socket is not actually ready,
+ /// then the closure should return a `WouldBlock` error. In such case the
+ /// readiness flag is cleared and the socket readiness is awaited again.
+ /// This loop is repeated until the closure returns an `Ok` or an error
+ /// other than `WouldBlock`.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `TcpStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ pub async fn async_io<R>(
+ &self,
+ interest: Interest,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .async_io(interest, || self.io.try_io(&mut f))
+ .await
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
@@ -1035,69 +1150,53 @@ impl TcpStream {
self.io.set_nodelay(nodelay)
}
- /// Reads the linger duration for this socket by getting the `SO_LINGER`
- /// option.
- ///
- /// For more information about this option, see [`set_linger`].
- ///
- /// [`set_linger`]: TcpStream::set_linger
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// println!("{:?}", stream.linger()?);
- /// # Ok(())
- /// # }
- /// ```
- pub fn linger(&self) -> io::Result<Option<Duration>> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.get_linger()
- }
-
- /// Sets the linger duration of this socket by setting the SO_LINGER option.
- ///
- /// This option controls the action taken when a stream has unsent messages and the stream is
- /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
- /// data or until the time expires.
- ///
- /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
- /// way that allows the process to continue as quickly as possible.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// stream.set_linger(None)?;
- /// # Ok(())
- /// # }
- /// ```
- pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.set_linger(dur)
- }
-
- fn to_mio(&self) -> mio::net::TcpSocket {
- #[cfg(windows)]
- {
- use std::os::windows::io::{AsRawSocket, FromRawSocket};
- unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
+ cfg_not_wasi! {
+ /// Reads the linger duration for this socket by getting the `SO_LINGER`
+ /// option.
+ ///
+ /// For more information about this option, see [`set_linger`].
+ ///
+ /// [`set_linger`]: TcpStream::set_linger
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// println!("{:?}", stream.linger()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ socket2::SockRef::from(self).linger()
}
- #[cfg(unix)]
- {
- use std::os::unix::io::{AsRawFd, FromRawFd};
- unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
+ /// Sets the linger duration of this socket by setting the SO_LINGER option.
+ ///
+ /// This option controls the action taken when a stream has unsent messages and the stream is
+ /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
+ /// data or until the time expires.
+ ///
+ /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
+ /// way that allows the process to continue as quickly as possible.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// stream.set_linger(None)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ socket2::SockRef::from(self).set_linger(dur)
}
}
@@ -1278,16 +1377,49 @@ mod sys {
self.io.as_raw_fd()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpStream {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
}
-#[cfg(windows)]
-mod sys {
- use super::TcpStream;
- use std::os::windows::prelude::*;
+cfg_windows! {
+ use crate::os::windows::io::{AsRawSocket, RawSocket};
+ #[cfg(not(tokio_no_as_fd))]
+ use crate::os::windows::io::{AsSocket, BorrowedSocket};
impl AsRawSocket for TcpStream {
fn as_raw_socket(&self) -> RawSocket {
self.io.as_raw_socket()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsSocket for TcpStream {
+ fn as_socket(&self) -> BorrowedSocket<'_> {
+ unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
+ }
+ }
+}
+
+#[cfg(all(tokio_unstable, tokio_wasi))]
+mod sys {
+ use super::TcpStream;
+ use std::os::wasi::prelude::*;
+
+ impl AsRawFd for TcpStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+ }
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpStream {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
}