summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/net/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/src/net/tcp')
-rw-r--r--vendor/tokio/src/net/tcp/listener.rs201
-rw-r--r--vendor/tokio/src/net/tcp/mod.rs6
-rw-r--r--vendor/tokio/src/net/tcp/socket.rs364
-rw-r--r--vendor/tokio/src/net/tcp/split.rs237
-rw-r--r--vendor/tokio/src/net/tcp/split_owned.rs239
-rw-r--r--vendor/tokio/src/net/tcp/stream.rs440
6 files changed, 1201 insertions, 286 deletions
diff --git a/vendor/tokio/src/net/tcp/listener.rs b/vendor/tokio/src/net/tcp/listener.rs
index 86f0ec1d2..34e393c89 100644
--- a/vendor/tokio/src/net/tcp/listener.rs
+++ b/vendor/tokio/src/net/tcp/listener.rs
@@ -1,8 +1,10 @@
use crate::io::{Interest, PollEvented};
use crate::net::tcp::TcpStream;
-use crate::net::{to_socket_addrs, ToSocketAddrs};
-use std::convert::TryFrom;
+cfg_not_wasi! {
+ use crate::net::{to_socket_addrs, ToSocketAddrs};
+}
+
use std::fmt;
use std::io;
use std::net::{self, SocketAddr};
@@ -55,68 +57,70 @@ cfg_net! {
}
impl TcpListener {
- /// Creates a new TcpListener, which will be bound to the specified address.
- ///
- /// The returned listener is ready for accepting connections.
- ///
- /// Binding with a port number of 0 will request that the OS assigns a port
- /// to this listener. The port allocated can be queried via the `local_addr`
- /// method.
- ///
- /// The address type can be any implementor of the [`ToSocketAddrs`] trait.
- /// If `addr` yields multiple addresses, bind will be attempted with each of
- /// the addresses until one succeeds and returns the listener. If none of
- /// the addresses succeed in creating a listener, the error returned from
- /// the last attempt (the last address) is returned.
- ///
- /// This function sets the `SO_REUSEADDR` option on the socket.
- ///
- /// To configure the socket before binding, you can use the [`TcpSocket`]
- /// type.
- ///
- /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
- /// [`TcpSocket`]: struct@crate::net::TcpSocket
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpListener;
- ///
- /// use std::io;
- ///
- /// #[tokio::main]
- /// async fn main() -> io::Result<()> {
- /// let listener = TcpListener::bind("127.0.0.1:2345").await?;
- ///
- /// // use the listener
- ///
- /// # let _ = listener;
- /// Ok(())
- /// }
- /// ```
- pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
- let addrs = to_socket_addrs(addr).await?;
+ cfg_not_wasi! {
+ /// Creates a new TcpListener, which will be bound to the specified address.
+ ///
+ /// The returned listener is ready for accepting connections.
+ ///
+ /// Binding with a port number of 0 will request that the OS assigns a port
+ /// to this listener. The port allocated can be queried via the `local_addr`
+ /// method.
+ ///
+ /// The address type can be any implementor of the [`ToSocketAddrs`] trait.
+ /// If `addr` yields multiple addresses, bind will be attempted with each of
+ /// the addresses until one succeeds and returns the listener. If none of
+ /// the addresses succeed in creating a listener, the error returned from
+ /// the last attempt (the last address) is returned.
+ ///
+ /// This function sets the `SO_REUSEADDR` option on the socket.
+ ///
+ /// To configure the socket before binding, you can use the [`TcpSocket`]
+ /// type.
+ ///
+ /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpListener;
+ ///
+ /// use std::io;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> io::Result<()> {
+ /// let listener = TcpListener::bind("127.0.0.1:2345").await?;
+ ///
+ /// // use the listener
+ ///
+ /// # let _ = listener;
+ /// Ok(())
+ /// }
+ /// ```
+ pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {
+ let addrs = to_socket_addrs(addr).await?;
- let mut last_err = None;
+ let mut last_err = None;
- for addr in addrs {
- match TcpListener::bind_addr(addr) {
- Ok(listener) => return Ok(listener),
- Err(e) => last_err = Some(e),
+ for addr in addrs {
+ match TcpListener::bind_addr(addr) {
+ Ok(listener) => return Ok(listener),
+ Err(e) => last_err = Some(e),
+ }
}
- }
- Err(last_err.unwrap_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- "could not resolve to any address",
- )
- }))
- }
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
+ }
- fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
- let listener = mio::net::TcpListener::bind(addr)?;
- TcpListener::new(listener)
+ fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
+ let listener = mio::net::TcpListener::bind(addr)?;
+ TcpListener::new(listener)
+ }
}
/// Accepts a new incoming connection from this listener.
@@ -190,15 +194,22 @@ impl TcpListener {
/// Creates new `TcpListener` from a `std::net::TcpListener`.
///
/// This function is intended to be used to wrap a TCP listener from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying listener; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
///
/// This API is typically paired with the `socket2` crate and the `Socket`
/// type to build up and customize a listener before it's shipped off to the
/// backing event loop. This allows configuration of options like
/// `SO_REUSEPORT`, binding to multiple addresses, etc.
///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the listener is in
+ /// non-blocking mode. Otherwise all I/O operations on the listener
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpListener::set_nonblocking
+ ///
/// # Examples
///
/// ```rust,no_run
@@ -216,18 +227,20 @@ impl TcpListener {
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener);
let io = PollEvented::new(io)?;
Ok(TcpListener { io })
}
- /// Turn a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`].
+ /// Turns a [`tokio::net::TcpListener`] into a [`std::net::TcpListener`].
///
/// The returned [`std::net::TcpListener`] will have nonblocking mode set as
/// `true`. Use [`set_nonblocking`] to change the blocking mode if needed.
@@ -267,11 +280,22 @@ impl TcpListener {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpListener::from_raw_socket(raw_socket) })
}
+
+ #[cfg(tokio_wasi)]
+ {
+ use std::os::wasi::io::{FromRawFd, IntoRawFd};
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::net::TcpListener::from_raw_fd(raw_fd) })
+ }
}
- pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
- let io = PollEvented::new(listener)?;
- Ok(TcpListener { io })
+ cfg_not_wasi! {
+ pub(crate) fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
+ let io = PollEvented::new(listener)?;
+ Ok(TcpListener { io })
+ }
}
/// Returns the local address that this listener is bound to.
@@ -382,16 +406,51 @@ mod sys {
self.io.as_raw_fd()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpListener {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
}
-#[cfg(windows)]
-mod sys {
- use super::TcpListener;
- use std::os::windows::prelude::*;
+cfg_unstable! {
+ #[cfg(tokio_wasi)]
+ mod sys {
+ use super::TcpListener;
+ use std::os::wasi::prelude::*;
+
+ impl AsRawFd for TcpListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+ }
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpListener {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
+ }
+}
+
+cfg_windows! {
+ use crate::os::windows::io::{AsRawSocket, RawSocket};
+ #[cfg(not(tokio_no_as_fd))]
+ use crate::os::windows::io::{AsSocket, BorrowedSocket};
impl AsRawSocket for TcpListener {
fn as_raw_socket(&self) -> RawSocket {
self.io.as_raw_socket()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsSocket for TcpListener {
+ fn as_socket(&self) -> BorrowedSocket<'_> {
+ unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
+ }
+ }
}
diff --git a/vendor/tokio/src/net/tcp/mod.rs b/vendor/tokio/src/net/tcp/mod.rs
index 7f0f6d914..734eabe6d 100644
--- a/vendor/tokio/src/net/tcp/mod.rs
+++ b/vendor/tokio/src/net/tcp/mod.rs
@@ -1,8 +1,10 @@
-//! TCP utility types
+//! TCP utility types.
pub(crate) mod listener;
-pub(crate) mod socket;
+cfg_not_wasi! {
+ pub(crate) mod socket;
+}
mod split;
pub use split::{ReadHalf, WriteHalf};
diff --git a/vendor/tokio/src/net/tcp/socket.rs b/vendor/tokio/src/net/tcp/socket.rs
index 02cb6377e..df792f9a6 100644
--- a/vendor/tokio/src/net/tcp/socket.rs
+++ b/vendor/tokio/src/net/tcp/socket.rs
@@ -4,10 +4,17 @@ use std::fmt;
use std::io;
use std::net::SocketAddr;
+#[cfg(all(unix, not(tokio_no_as_fd)))]
+use std::os::unix::io::{AsFd, BorrowedFd};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
-#[cfg(windows)]
-use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
+use std::time::Duration;
+
+cfg_windows! {
+ use crate::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
+ #[cfg(not(tokio_no_as_fd))]
+ use crate::os::windows::io::{AsSocket, BorrowedSocket};
+}
cfg_net! {
/// A TCP socket that has not yet been converted to a `TcpStream` or
@@ -81,13 +88,14 @@ cfg_net! {
/// [`AsRawFd`]: https://doc.rust-lang.org/std/os/unix/io/trait.AsRawFd.html
/// [`AsRawSocket`]: https://doc.rust-lang.org/std/os/windows/io/trait.AsRawSocket.html
/// [`socket2`]: https://docs.rs/socket2/
+ #[cfg_attr(docsrs, doc(alias = "connect_std"))]
pub struct TcpSocket {
- inner: mio::net::TcpSocket,
+ inner: socket2::Socket,
}
}
impl TcpSocket {
- /// Create a new socket configured for IPv4.
+ /// Creates a new socket configured for IPv4.
///
/// Calls `socket(2)` with `AF_INET` and `SOCK_STREAM`.
///
@@ -117,11 +125,10 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v4() -> io::Result<TcpSocket> {
- let inner = mio::net::TcpSocket::new_v4()?;
- Ok(TcpSocket { inner })
+ TcpSocket::new(socket2::Domain::IPV4)
}
- /// Create a new socket configured for IPv6.
+ /// Creates a new socket configured for IPv6.
///
/// Calls `socket(2)` with `AF_INET6` and `SOCK_STREAM`.
///
@@ -151,11 +158,38 @@ impl TcpSocket {
/// }
/// ```
pub fn new_v6() -> io::Result<TcpSocket> {
- let inner = mio::net::TcpSocket::new_v6()?;
+ TcpSocket::new(socket2::Domain::IPV6)
+ }
+
+ fn new(domain: socket2::Domain) -> io::Result<TcpSocket> {
+ let ty = socket2::Type::STREAM;
+ #[cfg(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ ))]
+ let ty = ty.nonblocking();
+ let inner = socket2::Socket::new(domain, ty, Some(socket2::Protocol::TCP))?;
+ #[cfg(not(any(
+ target_os = "android",
+ target_os = "dragonfly",
+ target_os = "freebsd",
+ target_os = "fuchsia",
+ target_os = "illumos",
+ target_os = "linux",
+ target_os = "netbsd",
+ target_os = "openbsd"
+ )))]
+ inner.set_nonblocking(true)?;
Ok(TcpSocket { inner })
}
- /// Allow the socket to bind to an in-use address.
+ /// Allows the socket to bind to an in-use address.
///
/// Behavior is platform specific. Refer to the target platform's
/// documentation for more details.
@@ -182,10 +216,10 @@ impl TcpSocket {
/// }
/// ```
pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> {
- self.inner.set_reuseaddr(reuseaddr)
+ self.inner.set_reuse_address(reuseaddr)
}
- /// Retrieves the value set for `SO_REUSEADDR` on this socket
+ /// Retrieves the value set for `SO_REUSEADDR` on this socket.
///
/// # Examples
///
@@ -208,10 +242,10 @@ impl TcpSocket {
/// }
/// ```
pub fn reuseaddr(&self) -> io::Result<bool> {
- self.inner.get_reuseaddr()
+ self.inner.reuse_address()
}
- /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// Allows the socket to bind to an in-use port. Only available for unix systems
/// (excluding Solaris & Illumos).
///
/// Behavior is platform specific. Refer to the target platform's
@@ -242,10 +276,10 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> {
- self.inner.set_reuseport(reuseport)
+ self.inner.set_reuse_port(reuseport)
}
- /// Allow the socket to bind to an in-use port. Only available for unix systems
+ /// Allows the socket to bind to an in-use port. Only available for unix systems
/// (excluding Solaris & Illumos).
///
/// Behavior is platform specific. Refer to the target platform's
@@ -277,14 +311,14 @@ impl TcpSocket {
doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos"))))
)]
pub fn reuseport(&self) -> io::Result<bool> {
- self.inner.get_reuseport()
+ self.inner.reuse_port()
}
/// Sets the size of the TCP send buffer on this socket.
///
/// On most operating systems, this sets the `SO_SNDBUF` socket option.
pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> {
- self.inner.set_send_buffer_size(size)
+ self.inner.set_send_buffer_size(size as usize)
}
/// Returns the size of the TCP send buffer for this socket.
@@ -311,14 +345,14 @@ impl TcpSocket {
///
/// [`set_send_buffer_size`]: #method.set_send_buffer_size
pub fn send_buffer_size(&self) -> io::Result<u32> {
- self.inner.get_send_buffer_size()
+ self.inner.send_buffer_size().map(|n| n as u32)
}
/// Sets the size of the TCP receive buffer on this socket.
///
/// On most operating systems, this sets the `SO_RCVBUF` socket option.
pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> {
- self.inner.set_recv_buffer_size(size)
+ self.inner.set_recv_buffer_size(size as usize)
}
/// Returns the size of the TCP receive buffer for this socket.
@@ -345,10 +379,160 @@ impl TcpSocket {
///
/// [`set_recv_buffer_size`]: #method.set_recv_buffer_size
pub fn recv_buffer_size(&self) -> io::Result<u32> {
- self.inner.get_recv_buffer_size()
+ self.inner.recv_buffer_size().map(|n| n as u32)
+ }
+
+ /// Sets the linger duration of this socket by setting the SO_LINGER option.
+ ///
+ /// This option controls the action taken when a stream has unsent messages and the stream is
+ /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
+ /// data or until the time expires.
+ ///
+ /// If SO_LINGER is not specified, and the socket is closed, the system handles the call in a
+ /// way that allows the process to continue as quickly as possible.
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ self.inner.set_linger(dur)
+ }
+
+ /// Reads the linger duration for this socket by getting the `SO_LINGER`
+ /// option.
+ ///
+ /// For more information about this option, see [`set_linger`].
+ ///
+ /// [`set_linger`]: TcpSocket::set_linger
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ self.inner.linger()
+ }
+
+ /// Sets the value of the `TCP_NODELAY` option on this socket.
+ ///
+ /// If set, this option disables the Nagle algorithm. This means that segments are always
+ /// sent as soon as possible, even if there is only a small amount of data. When not set,
+ /// data is buffered until there is a sufficient amount to send out, thereby avoiding
+ /// the frequent sending of small packets.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let socket = TcpSocket::new_v4()?;
+ ///
+ /// println!("{:?}", socket.nodelay()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
+ self.inner.set_nodelay(nodelay)
+ }
+
+ /// Gets the value of the `TCP_NODELAY` option on this socket.
+ ///
+ /// For more information about this option, see [`set_nodelay`].
+ ///
+ /// [`set_nodelay`]: TcpSocket::set_nodelay
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpSocket;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpSocket::new_v4()?;
+ ///
+ /// stream.set_nodelay(true)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn nodelay(&self) -> io::Result<bool> {
+ self.inner.nodelay()
}
- /// Get the local address of this socket.
+ /// Gets the value of the `IP_TOS` option for this socket.
+ ///
+ /// For more information about this option, see [`set_tos`].
+ ///
+ /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or
+ /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
+ ///
+ /// [`set_tos`]: Self::set_tos
+ // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178
+ #[cfg(not(any(
+ target_os = "fuchsia",
+ target_os = "redox",
+ target_os = "solaris",
+ target_os = "illumos",
+ )))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(not(any(
+ target_os = "fuchsia",
+ target_os = "redox",
+ target_os = "solaris",
+ target_os = "illumos",
+ ))))
+ )]
+ pub fn tos(&self) -> io::Result<u32> {
+ self.inner.tos()
+ }
+
+ /// Sets the value for the `IP_TOS` option on this socket.
+ ///
+ /// This value sets the type-of-service field that is used in every packet
+ /// sent from this socket.
+ ///
+ /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or
+ /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options)
+ // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178
+ #[cfg(not(any(
+ target_os = "fuchsia",
+ target_os = "redox",
+ target_os = "solaris",
+ target_os = "illumos",
+ )))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(not(any(
+ target_os = "fuchsia",
+ target_os = "redox",
+ target_os = "solaris",
+ target_os = "illumos",
+ ))))
+ )]
+ pub fn set_tos(&self, tos: u32) -> io::Result<()> {
+ self.inner.set_tos(tos)
+ }
+
+ /// Gets the value for the `SO_BINDTODEVICE` option on this socket
+ ///
+ /// This value gets the socket binded device's interface name.
+ #[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux",))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux",)))
+ )]
+ pub fn device(&self) -> io::Result<Option<Vec<u8>>> {
+ self.inner.device()
+ }
+
+ /// Sets the value for the `SO_BINDTODEVICE` option on this socket
+ ///
+ /// If a socket is bound to an interface, only packets received from that
+ /// particular interface are processed by the socket. Note that this only
+ /// works for some socket types, particularly `AF_INET` sockets.
+ ///
+ /// If `interface` is `None` or an empty string it removes the binding.
+ #[cfg(all(any(target_os = "android", target_os = "fuchsia", target_os = "linux")))]
+ #[cfg_attr(
+ docsrs,
+ doc(cfg(all(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))))
+ )]
+ pub fn bind_device(&self, interface: Option<&[u8]>) -> io::Result<()> {
+ self.inner.bind_device(interface)
+ }
+
+ /// Gets the local address of this socket.
///
/// Will fail on windows if called before `bind`.
///
@@ -371,10 +555,15 @@ impl TcpSocket {
/// }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
- self.inner.get_localaddr()
+ self.inner.local_addr().and_then(convert_address)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.inner.take_error()
}
- /// Bind the socket to the given address.
+ /// Binds the socket to the given address.
///
/// This calls the `bind(2)` operating-system function. Behavior is
/// platform specific. Refer to the target platform's documentation for more
@@ -403,10 +592,10 @@ impl TcpSocket {
/// }
/// ```
pub fn bind(&self, addr: SocketAddr) -> io::Result<()> {
- self.inner.bind(addr)
+ self.inner.bind(&addr.into())
}
- /// Establish a TCP connection with a peer at the specified socket address.
+ /// Establishes a TCP connection with a peer at the specified socket address.
///
/// The `TcpSocket` is consumed. Once the connection is established, a
/// connected [`TcpStream`] is returned. If the connection fails, the
@@ -439,11 +628,36 @@ impl TcpSocket {
/// }
/// ```
pub async fn connect(self, addr: SocketAddr) -> io::Result<TcpStream> {
- let mio = self.inner.connect(addr)?;
+ if let Err(err) = self.inner.connect(&addr.into()) {
+ #[cfg(unix)]
+ if err.raw_os_error() != Some(libc::EINPROGRESS) {
+ return Err(err);
+ }
+ #[cfg(windows)]
+ if err.kind() != io::ErrorKind::WouldBlock {
+ return Err(err);
+ }
+ }
+ #[cfg(unix)]
+ let mio = {
+ use std::os::unix::io::{FromRawFd, IntoRawFd};
+
+ let raw_fd = self.inner.into_raw_fd();
+ unsafe { mio::net::TcpStream::from_raw_fd(raw_fd) }
+ };
+
+ #[cfg(windows)]
+ let mio = {
+ use std::os::windows::io::{FromRawSocket, IntoRawSocket};
+
+ let raw_socket = self.inner.into_raw_socket();
+ unsafe { mio::net::TcpStream::from_raw_socket(raw_socket) }
+ };
+
TcpStream::connect_mio(mio).await
}
- /// Convert the socket into a `TcpListener`.
+ /// Converts the socket into a `TcpListener`.
///
/// `backlog` defines the maximum number of pending connections are queued
/// by the operating system at any given time. Connection are removed from
@@ -479,7 +693,23 @@ impl TcpSocket {
/// }
/// ```
pub fn listen(self, backlog: u32) -> io::Result<TcpListener> {
- let mio = self.inner.listen(backlog)?;
+ self.inner.listen(backlog as i32)?;
+ #[cfg(unix)]
+ let mio = {
+ use std::os::unix::io::{FromRawFd, IntoRawFd};
+
+ let raw_fd = self.inner.into_raw_fd();
+ unsafe { mio::net::TcpListener::from_raw_fd(raw_fd) }
+ };
+
+ #[cfg(windows)]
+ let mio = {
+ use std::os::windows::io::{FromRawSocket, IntoRawSocket};
+
+ let raw_socket = self.inner.into_raw_socket();
+ unsafe { mio::net::TcpListener::from_raw_socket(raw_socket) }
+ };
+
TcpListener::new(mio)
}
@@ -491,6 +721,15 @@ impl TcpSocket {
/// [`std::net::TcpStream`]: struct@std::net::TcpStream
/// [`socket2`]: https://docs.rs/socket2/
///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode. Otherwise all I/O operations on the socket
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
+ ///
/// # Examples
///
/// ```
@@ -499,8 +738,8 @@ impl TcpSocket {
///
/// #[tokio::main]
/// async fn main() -> std::io::Result<()> {
- ///
/// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
+ /// socket2_socket.set_nonblocking(true)?;
///
/// let socket = TcpSocket::from_std_stream(socket2_socket.into());
///
@@ -526,6 +765,16 @@ impl TcpSocket {
}
}
+fn convert_address(address: socket2::SockAddr) -> io::Result<SocketAddr> {
+ match address.as_socket() {
+ Some(address) => Ok(address),
+ None => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "invalid address family (not IPv4 or IPv6)",
+ )),
+ }
+}
+
impl fmt::Debug for TcpSocket {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(fmt)
@@ -539,6 +788,13 @@ impl AsRawFd for TcpSocket {
}
}
+#[cfg(all(unix, not(tokio_no_as_fd)))]
+impl AsFd for TcpSocket {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+}
+
#[cfg(unix)]
impl FromRawFd for TcpSocket {
/// Converts a `RawFd` to a `TcpSocket`.
@@ -548,7 +804,7 @@ impl FromRawFd for TcpSocket {
/// The caller is responsible for ensuring that the socket is in
/// non-blocking mode.
unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket {
- let inner = mio::net::TcpSocket::from_raw_fd(fd);
+ let inner = socket2::Socket::from_raw_fd(fd);
TcpSocket { inner }
}
}
@@ -560,30 +816,36 @@ impl IntoRawFd for TcpSocket {
}
}
-#[cfg(windows)]
-impl IntoRawSocket for TcpSocket {
- fn into_raw_socket(self) -> RawSocket {
- self.inner.into_raw_socket()
+cfg_windows! {
+ impl IntoRawSocket for TcpSocket {
+ fn into_raw_socket(self) -> RawSocket {
+ self.inner.into_raw_socket()
+ }
+ }
+
+ impl AsRawSocket for TcpSocket {
+ fn as_raw_socket(&self) -> RawSocket {
+ self.inner.as_raw_socket()
+ }
}
-}
-#[cfg(windows)]
-impl AsRawSocket for TcpSocket {
- fn as_raw_socket(&self) -> RawSocket {
- self.inner.as_raw_socket()
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsSocket for TcpSocket {
+ fn as_socket(&self) -> BorrowedSocket<'_> {
+ unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
+ }
}
-}
-#[cfg(windows)]
-impl FromRawSocket for TcpSocket {
- /// Converts a `RawSocket` to a `TcpStream`.
- ///
- /// # Notes
- ///
- /// The caller is responsible for ensuring that the socket is in
- /// non-blocking mode.
- unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
- let inner = mio::net::TcpSocket::from_raw_socket(socket);
- TcpSocket { inner }
+ impl FromRawSocket for TcpSocket {
+ /// Converts a `RawSocket` to a `TcpStream`.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the socket is in
+ /// non-blocking mode.
+ unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket {
+ let inner = socket2::Socket::from_raw_socket(socket);
+ TcpSocket { inner }
+ }
}
}
diff --git a/vendor/tokio/src/net/tcp/split.rs b/vendor/tokio/src/net/tcp/split.rs
index 8ae70ce13..1a1e22253 100644
--- a/vendor/tokio/src/net/tcp/split.rs
+++ b/vendor/tokio/src/net/tcp/split.rs
@@ -9,14 +9,18 @@
//! level.
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::TcpStream;
use std::io;
-use std::net::Shutdown;
+use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Borrowed read half of a [`TcpStream`], created by [`split`].
///
/// Reading from a `ReadHalf` is usually done using the convenience methods found on the
@@ -49,7 +53,7 @@ pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) {
}
impl ReadHalf<'_> {
- /// Attempt to receive data on the socket, without removing that data from
+ /// Attempts to receive data on the socket, without removing that data from
/// the queue, registering the current task for wakeup if data is not yet
/// available.
///
@@ -134,6 +138,233 @@ impl ReadHalf<'_> {
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_read()`]. It can be used instead
+ /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
+ /// and [`Ready::READ_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// [`try_read()`]: Self::try_read
+ /// [`readable()`]: Self::readable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// This function is also equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.0.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.try_read(buf)
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.0.try_read_vectored(bufs)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.0.try_read_buf(buf)
+ }
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
+}
+
+impl WriteHalf<'_> {
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_write()`]. It can be used instead
+ /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
+ /// and [`Ready::WRITE_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// [`try_write()`]: Self::try_write
+ /// [`writable()`]: Self::writable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.0.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.0.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.0.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.0.try_write_vectored(bufs)
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.0.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.0.local_addr()
+ }
}
impl AsyncRead for ReadHalf<'_> {
diff --git a/vendor/tokio/src/net/tcp/split_owned.rs b/vendor/tokio/src/net/tcp/split_owned.rs
index 1bcb4f2ea..6771d6497 100644
--- a/vendor/tokio/src/net/tcp/split_owned.rs
+++ b/vendor/tokio/src/net/tcp/split_owned.rs
@@ -9,16 +9,20 @@
//! level.
use crate::future::poll_fn;
-use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::io::{AsyncRead, AsyncWrite, Interest, ReadBuf, Ready};
use crate::net::TcpStream;
use std::error::Error;
-use std::net::Shutdown;
+use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{fmt, io};
+cfg_io_util! {
+ use bytes::BufMut;
+}
+
/// Owned read half of a [`TcpStream`], created by [`into_split`].
///
/// Reading from an `OwnedReadHalf` is usually done using the convenience methods found
@@ -189,6 +193,141 @@ impl OwnedReadHalf {
let mut buf = ReadBuf::new(buf);
poll_fn(|cx| self.poll_peek(cx, &mut buf)).await
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_read()`]. It can be used instead
+ /// of [`readable()`] to check the returned ready set for [`Ready::READABLE`]
+ /// and [`Ready::READ_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// [`try_read()`]: Self::try_read
+ /// [`readable()`]: Self::readable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become readable.
+ ///
+ /// This function is equivalent to `ready(Interest::READABLE)` and is usually
+ /// paired with `try_read()`.
+ ///
+ /// This function is also equivalent to [`TcpStream::ready`].
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn readable(&self) -> io::Result<()> {
+ self.inner.readable().await
+ }
+
+ /// Tries to read data from the stream into the provided buffer, returning how
+ /// many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
+ self.inner.try_read(buf)
+ }
+
+ /// Tries to read data from the stream into the provided buffers, returning
+ /// how many bytes were read.
+ ///
+ /// Data is copied to fill each buffer in order, with the final buffer
+ /// written to possibly being only partially filled. This method behaves
+ /// equivalently to a single call to [`try_read()`] with concatenated
+ /// buffers.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_vectored()` is non-blocking, the buffer does not have to be
+ /// stored by the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`try_read()`]: Self::try_read()
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
+ self.inner.try_read_vectored(bufs)
+ }
+
+ cfg_io_util! {
+ /// Tries to read data from the stream into the provided buffer, advancing the
+ /// buffer's internal cursor, returning how many bytes were read.
+ ///
+ /// Receives any pending data from the socket but does not wait for new data
+ /// to arrive. On success, returns the number of bytes read. Because
+ /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
+ /// the async task and can exist entirely on the stack.
+ ///
+ /// Usually, [`readable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: Self::readable()
+ /// [`ready()`]: Self::ready()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully read, `Ok(n)` is returned, where `n` is the
+ /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
+ /// and will no longer yield data. If the stream is not ready to read data
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
+ self.inner.try_read_buf(buf)
+ }
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl AsyncRead for OwnedReadHalf {
@@ -211,13 +350,103 @@ impl OwnedWriteHalf {
reunite(other, self)
}
- /// Destroy the write half, but don't close the write half of the stream
+ /// Destroys the write half, but don't close the write half of the stream
/// until the read half is dropped. If the read half has already been
/// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
}
+
+ /// Waits for any of the requested ready states.
+ ///
+ /// This function is usually paired with [`try_write()`]. It can be used instead
+ /// of [`writable()`] to check the returned ready set for [`Ready::WRITABLE`]
+ /// and [`Ready::WRITE_CLOSED`] events.
+ ///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
+ /// This function is equivalent to [`TcpStream::ready`].
+ ///
+ /// [`try_write()`]: Self::try_write
+ /// [`writable()`]: Self::writable
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to read or write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
+ self.inner.ready(interest).await
+ }
+
+ /// Waits for the socket to become writable.
+ ///
+ /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
+ /// paired with `try_write()`.
+ ///
+ /// # Cancel safety
+ ///
+ /// This method is cancel safe. Once a readiness event occurs, the method
+ /// will continue to return immediately until the readiness event is
+ /// consumed by an attempt to write that fails with `WouldBlock` or
+ /// `Poll::Pending`.
+ pub async fn writable(&self) -> io::Result<()> {
+ self.inner.writable().await
+ }
+
+ /// Tries to write a buffer to the stream, returning how many bytes were
+ /// written.
+ ///
+ /// The function will attempt to write the entire contents of `buf`, but
+ /// only part of the buffer may be written.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
+ self.inner.try_write(buf)
+ }
+
+ /// Tries to write several buffers to the stream, returning how many bytes
+ /// were written.
+ ///
+ /// Data is written from each buffer in order, with the final buffer read
+ /// from possible being only partially consumed. This method behaves
+ /// equivalently to a single call to [`try_write()`] with concatenated
+ /// buffers.
+ ///
+ /// This function is usually paired with `writable()`.
+ ///
+ /// [`try_write()`]: Self::try_write()
+ ///
+ /// # Return
+ ///
+ /// If data is successfully written, `Ok(n)` is returned, where `n` is the
+ /// number of bytes written. If the stream is not ready to write data,
+ /// `Err(io::ErrorKind::WouldBlock)` is returned.
+ pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.inner.try_write_vectored(bufs)
+ }
+
+ /// Returns the remote address that this stream is connected to.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.peer_addr()
+ }
+
+ /// Returns the local address that this stream is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.inner.local_addr()
+ }
}
impl Drop for OwnedWriteHalf {
@@ -267,12 +496,12 @@ impl AsyncWrite for OwnedWriteHalf {
impl AsRef<TcpStream> for OwnedReadHalf {
fn as_ref(&self) -> &TcpStream {
- &*self.inner
+ &self.inner
}
}
impl AsRef<TcpStream> for OwnedWriteHalf {
fn as_ref(&self) -> &TcpStream {
- &*self.inner
+ &self.inner
}
}
diff --git a/vendor/tokio/src/net/tcp/stream.rs b/vendor/tokio/src/net/tcp/stream.rs
index 0277a360d..b7104d78b 100644
--- a/vendor/tokio/src/net/tcp/stream.rs
+++ b/vendor/tokio/src/net/tcp/stream.rs
@@ -1,16 +1,18 @@
-use crate::future::poll_fn;
+cfg_not_wasi! {
+ use crate::future::poll_fn;
+ use crate::net::{to_socket_addrs, ToSocketAddrs};
+ use std::time::Duration;
+}
+
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
-use crate::net::{to_socket_addrs, ToSocketAddrs};
-use std::convert::TryFrom;
use std::fmt;
use std::io;
use std::net::{Shutdown, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};
-use std::time::Duration;
cfg_io_util! {
use bytes::BufMut;
@@ -70,86 +72,88 @@ cfg_net! {
}
impl TcpStream {
- /// Opens a TCP connection to a remote host.
- ///
- /// `addr` is an address of the remote host. Anything which implements the
- /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
- /// yields multiple addresses, connect will be attempted with each of the
- /// addresses until a connection is successful. If none of the addresses
- /// result in a successful connection, the error returned from the last
- /// connection attempt (the last address) is returned.
- ///
- /// To configure the socket before connecting, you can use the [`TcpSocket`]
- /// type.
- ///
- /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
- /// [`TcpSocket`]: struct@crate::net::TcpSocket
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- /// use tokio::io::AsyncWriteExt;
- /// use std::error::Error;
- ///
- /// #[tokio::main]
- /// async fn main() -> Result<(), Box<dyn Error>> {
- /// // Connect to a peer
- /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// // Write some data.
- /// stream.write_all(b"hello world!").await?;
- ///
- /// Ok(())
- /// }
- /// ```
- ///
- /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
- ///
- /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
- /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
- pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
- let addrs = to_socket_addrs(addr).await?;
+ cfg_not_wasi! {
+ /// Opens a TCP connection to a remote host.
+ ///
+ /// `addr` is an address of the remote host. Anything which implements the
+ /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr`
+ /// yields multiple addresses, connect will be attempted with each of the
+ /// addresses until a connection is successful. If none of the addresses
+ /// result in a successful connection, the error returned from the last
+ /// connection attempt (the last address) is returned.
+ ///
+ /// To configure the socket before connecting, you can use the [`TcpSocket`]
+ /// type.
+ ///
+ /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
+ /// [`TcpSocket`]: struct@crate::net::TcpSocket
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ /// use tokio::io::AsyncWriteExt;
+ /// use std::error::Error;
+ ///
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// // Connect to a peer
+ /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// // Write some data.
+ /// stream.write_all(b"hello world!").await?;
+ ///
+ /// Ok(())
+ /// }
+ /// ```
+ ///
+ /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
+ ///
+ /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
+ /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
+ pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
+ let addrs = to_socket_addrs(addr).await?;
- let mut last_err = None;
+ let mut last_err = None;
- for addr in addrs {
- match TcpStream::connect_addr(addr).await {
- Ok(stream) => return Ok(stream),
- Err(e) => last_err = Some(e),
+ for addr in addrs {
+ match TcpStream::connect_addr(addr).await {
+ Ok(stream) => return Ok(stream),
+ Err(e) => last_err = Some(e),
+ }
}
+
+ Err(last_err.unwrap_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "could not resolve to any address",
+ )
+ }))
}
- Err(last_err.unwrap_or_else(|| {
- io::Error::new(
- io::ErrorKind::InvalidInput,
- "could not resolve to any address",
- )
- }))
- }
+ /// Establishes a connection to the specified `addr`.
+ async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
+ let sys = mio::net::TcpStream::connect(addr)?;
+ TcpStream::connect_mio(sys).await
+ }
- /// Establishes a connection to the specified `addr`.
- async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
- let sys = mio::net::TcpStream::connect(addr)?;
- TcpStream::connect_mio(sys).await
- }
+ pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
+ let stream = TcpStream::new(sys)?;
- pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
- let stream = TcpStream::new(sys)?;
+ // Once we've connected, wait for the stream to be writable as
+ // that's when the actual connection has been initiated. Once we're
+ // writable we check for `take_socket_error` to see if the connect
+ // actually hit an error or not.
+ //
+ // If all that succeeded then we ship everything on up.
+ poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
- // Once we've connected, wait for the stream to be writable as
- // that's when the actual connection has been initiated. Once we're
- // writable we check for `take_socket_error` to see if the connect
- // actually hit an error or not.
- //
- // If all that succeeded then we ship everything on up.
- poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
+ if let Some(e) = stream.io.take_error()? {
+ return Err(e);
+ }
- if let Some(e) = stream.io.take_error()? {
- return Err(e);
+ Ok(stream)
}
-
- Ok(stream)
}
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
@@ -160,9 +164,16 @@ impl TcpStream {
/// Creates new `TcpStream` from a `std::net::TcpStream`.
///
/// This function is intended to be used to wrap a TCP stream from the
- /// standard library in the Tokio equivalent. The conversion assumes nothing
- /// about the underlying stream; it is left up to the user to set it in
- /// non-blocking mode.
+ /// standard library in the Tokio equivalent.
+ ///
+ /// # Notes
+ ///
+ /// The caller is responsible for ensuring that the stream is in
+ /// non-blocking mode. Otherwise all I/O operations on the stream
+ /// will block the thread, which will cause unexpected behavior.
+ /// Non-blocking mode can be set using [`set_nonblocking`].
+ ///
+ /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
///
/// # Examples
///
@@ -181,18 +192,20 @@ impl TcpStream {
///
/// # Panics
///
- /// This function panics if thread-local runtime is not set.
+ /// This function panics if it is not called from within a runtime with
+ /// IO enabled.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
+ #[track_caller]
pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_std(stream);
let io = PollEvented::new(io)?;
Ok(TcpStream { io })
}
- /// Turn a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
+ /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
///
/// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
/// Use [`set_nonblocking`] to change the blocking mode if needed.
@@ -244,6 +257,15 @@ impl TcpStream {
.map(|io| io.into_raw_socket())
.map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
}
+
+ #[cfg(tokio_wasi)]
+ {
+ use std::os::wasi::io::{FromRawFd, IntoRawFd};
+ self.io
+ .into_inner()
+ .map(|io| io.into_raw_fd())
+ .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
+ }
}
/// Returns the local address that this stream is bound to.
@@ -264,6 +286,11 @@ impl TcpStream {
self.io.local_addr()
}
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.take_error()
+ }
+
/// Returns the remote address that this stream is connected to.
///
/// # Examples
@@ -350,12 +377,18 @@ impl TcpStream {
}
}
- /// Wait for any of the requested ready states.
+ /// Waits for any of the requested ready states.
///
/// This function is usually paired with `try_read()` or `try_write()`. It
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
+ /// The function may complete without the socket being ready. This is a
+ /// false-positive and attempting an operation will return with
+ /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
+ /// [`Ready`] set, so you should always check the returned value and possibly
+ /// wait again if the requested states are not set.
+ ///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
@@ -387,7 +420,7 @@ impl TcpStream {
/// // if the readiness event is a false positive.
/// match stream.try_read(&mut data) {
/// Ok(n) => {
- /// println!("read {} bytes", n);
+ /// println!("read {} bytes", n);
/// }
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// continue;
@@ -422,7 +455,7 @@ impl TcpStream {
Ok(event.ready)
}
- /// Wait for the socket to become readable.
+ /// Waits for the socket to become readable.
///
/// This function is equivalent to `ready(Interest::READABLE)` and is usually
/// paired with `try_read()`.
@@ -510,7 +543,7 @@ impl TcpStream {
self.io.registration().poll_read_ready(cx).map_ok(|_| ())
}
- /// Try to read data from the stream into the provided buffer, returning how
+ /// Tries to read data from the stream into the provided buffer, returning how
/// many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -526,8 +559,12 @@ impl TcpStream {
/// # Return
///
/// If data is successfully read, `Ok(n)` is returned, where `n` is the
- /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
- /// and will no longer yield data. If the stream is not ready to read data
+ /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
+ ///
+ /// 1. The stream's read half is closed and will no longer yield data.
+ /// 2. The specified buffer was 0 bytes in length.
+ ///
+ /// If the stream is not ready to read data,
/// `Err(io::ErrorKind::WouldBlock)` is returned.
///
/// # Examples
@@ -577,7 +614,7 @@ impl TcpStream {
.try_io(Interest::READABLE, || (&*self.io).read(buf))
}
- /// Try to read data from the stream into the provided buffers, returning
+ /// Tries to read data from the stream into the provided buffers, returning
/// how many bytes were read.
///
/// Data is copied to fill each buffer in order, with the final buffer
@@ -656,7 +693,7 @@ impl TcpStream {
}
cfg_io_util! {
- /// Try to read data from the stream into the provided buffer, advancing the
+ /// Tries to read data from the stream into the provided buffer, advancing the
/// buffer's internal cursor, returning how many bytes were read.
///
/// Receives any pending data from the socket but does not wait for new data
@@ -734,7 +771,7 @@ impl TcpStream {
}
}
- /// Wait for the socket to become writable.
+ /// Waits for the socket to become writable.
///
/// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
/// paired with `try_write()`.
@@ -874,7 +911,7 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write(buf))
}
- /// Try to write several buffers to the stream, returning how many bytes
+ /// Tries to write several buffers to the stream, returning how many bytes
/// were written.
///
/// Data is written from each buffer in order, with the final buffer read
@@ -936,6 +973,84 @@ impl TcpStream {
.try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
}
+ /// Tries to read or write from the socket using a user-provided IO operation.
+ ///
+ /// If the socket is ready, the provided closure is called. The closure
+ /// should attempt to perform IO operation on the socket by manually
+ /// calling the appropriate syscall. If the operation fails because the
+ /// socket is not actually ready, then the closure should return a
+ /// `WouldBlock` error and the readiness flag is cleared. The return value
+ /// of the closure is then returned by `try_io`.
+ ///
+ /// If the socket is not ready, then the closure is not called
+ /// and a `WouldBlock` error is returned.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `TcpStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ ///
+ /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
+ ///
+ /// [`readable()`]: TcpStream::readable()
+ /// [`writable()`]: TcpStream::writable()
+ /// [`ready()`]: TcpStream::ready()
+ pub fn try_io<R>(
+ &self,
+ interest: Interest,
+ f: impl FnOnce() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .try_io(interest, || self.io.try_io(f))
+ }
+
+ /// Reads or writes from the socket using a user-provided IO operation.
+ ///
+ /// The readiness of the socket is awaited and when the socket is ready,
+ /// the provided closure is called. The closure should attempt to perform
+ /// IO operation on the socket by manually calling the appropriate syscall.
+ /// If the operation fails because the socket is not actually ready,
+ /// then the closure should return a `WouldBlock` error. In such case the
+ /// readiness flag is cleared and the socket readiness is awaited again.
+ /// This loop is repeated until the closure returns an `Ok` or an error
+ /// other than `WouldBlock`.
+ ///
+ /// The closure should only return a `WouldBlock` error if it has performed
+ /// an IO operation on the socket that failed due to the socket not being
+ /// ready. Returning a `WouldBlock` error in any other situation will
+ /// incorrectly clear the readiness flag, which can cause the socket to
+ /// behave incorrectly.
+ ///
+ /// The closure should not perform the IO operation using any of the methods
+ /// defined on the Tokio `TcpStream` type, as this will mess with the
+ /// readiness flag and can cause the socket to behave incorrectly.
+ ///
+ /// This method is not intended to be used with combined interests.
+ /// The closure should perform only one type of IO operation, so it should not
+ /// require more than one ready state. This method may panic or sleep forever
+ /// if it is called with a combined interest.
+ pub async fn async_io<R>(
+ &self,
+ interest: Interest,
+ mut f: impl FnMut() -> io::Result<R>,
+ ) -> io::Result<R> {
+ self.io
+ .registration()
+ .async_io(interest, || self.io.try_io(&mut f))
+ .await
+ }
+
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
@@ -1035,69 +1150,53 @@ impl TcpStream {
self.io.set_nodelay(nodelay)
}
- /// Reads the linger duration for this socket by getting the `SO_LINGER`
- /// option.
- ///
- /// For more information about this option, see [`set_linger`].
- ///
- /// [`set_linger`]: TcpStream::set_linger
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// println!("{:?}", stream.linger()?);
- /// # Ok(())
- /// # }
- /// ```
- pub fn linger(&self) -> io::Result<Option<Duration>> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.get_linger()
- }
-
- /// Sets the linger duration of this socket by setting the SO_LINGER option.
- ///
- /// This option controls the action taken when a stream has unsent messages and the stream is
- /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
- /// data or until the time expires.
- ///
- /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
- /// way that allows the process to continue as quickly as possible.
- ///
- /// # Examples
- ///
- /// ```no_run
- /// use tokio::net::TcpStream;
- ///
- /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
- /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
- ///
- /// stream.set_linger(None)?;
- /// # Ok(())
- /// # }
- /// ```
- pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
- let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
-
- mio_socket.set_linger(dur)
- }
-
- fn to_mio(&self) -> mio::net::TcpSocket {
- #[cfg(windows)]
- {
- use std::os::windows::io::{AsRawSocket, FromRawSocket};
- unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
+ cfg_not_wasi! {
+ /// Reads the linger duration for this socket by getting the `SO_LINGER`
+ /// option.
+ ///
+ /// For more information about this option, see [`set_linger`].
+ ///
+ /// [`set_linger`]: TcpStream::set_linger
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// println!("{:?}", stream.linger()?);
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn linger(&self) -> io::Result<Option<Duration>> {
+ socket2::SockRef::from(self).linger()
}
- #[cfg(unix)]
- {
- use std::os::unix::io::{AsRawFd, FromRawFd};
- unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
+ /// Sets the linger duration of this socket by setting the SO_LINGER option.
+ ///
+ /// This option controls the action taken when a stream has unsent messages and the stream is
+ /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
+ /// data or until the time expires.
+ ///
+ /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
+ /// way that allows the process to continue as quickly as possible.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use tokio::net::TcpStream;
+ ///
+ /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+ /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
+ ///
+ /// stream.set_linger(None)?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
+ socket2::SockRef::from(self).set_linger(dur)
}
}
@@ -1278,16 +1377,49 @@ mod sys {
self.io.as_raw_fd()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpStream {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
}
-#[cfg(windows)]
-mod sys {
- use super::TcpStream;
- use std::os::windows::prelude::*;
+cfg_windows! {
+ use crate::os::windows::io::{AsRawSocket, RawSocket};
+ #[cfg(not(tokio_no_as_fd))]
+ use crate::os::windows::io::{AsSocket, BorrowedSocket};
impl AsRawSocket for TcpStream {
fn as_raw_socket(&self) -> RawSocket {
self.io.as_raw_socket()
}
}
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsSocket for TcpStream {
+ fn as_socket(&self) -> BorrowedSocket<'_> {
+ unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
+ }
+ }
+}
+
+#[cfg(all(tokio_unstable, tokio_wasi))]
+mod sys {
+ use super::TcpStream;
+ use std::os::wasi::prelude::*;
+
+ impl AsRawFd for TcpStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.as_raw_fd()
+ }
+ }
+
+ #[cfg(not(tokio_no_as_fd))]
+ impl AsFd for TcpStream {
+ fn as_fd(&self) -> BorrowedFd<'_> {
+ unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
+ }
+ }
}