diff options
Diffstat (limited to 'third_party/rust/tokio/src/net/tcp')
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/incoming.rs | 42 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/listener.rs | 441 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/mod.rs | 13 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/split.rs | 163 | ||||
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/stream.rs | 869 |
5 files changed, 1528 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/net/tcp/incoming.rs b/third_party/rust/tokio/src/net/tcp/incoming.rs new file mode 100644 index 0000000000..062be1e9cf --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/incoming.rs @@ -0,0 +1,42 @@ +use crate::net::tcp::{TcpListener, TcpStream}; + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Stream returned by the `TcpListener::incoming` function representing the +/// stream of sockets received from a listener. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Incoming<'a> { + inner: &'a mut TcpListener, +} + +impl Incoming<'_> { + pub(crate) fn new(listener: &mut TcpListener) -> Incoming<'_> { + Incoming { inner: listener } + } + + /// Attempts to poll `TcpStream` by polling inner `TcpListener` to accept + /// connection. + /// + /// If `TcpListener` isn't ready yet, `Poll::Pending` is returned and + /// current task will be notified by a waker. + pub fn poll_accept( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<TcpStream>> { + let (socket, _) = ready!(self.inner.poll_accept(cx))?; + Poll::Ready(Ok(socket)) + } +} + +#[cfg(feature = "stream")] +impl crate::stream::Stream for Incoming<'_> { + type Item = io::Result<TcpStream>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (socket, _) = ready!(self.inner.poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} diff --git a/third_party/rust/tokio/src/net/tcp/listener.rs b/third_party/rust/tokio/src/net/tcp/listener.rs new file mode 100644 index 0000000000..cde22cb636 --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/listener.rs @@ -0,0 +1,441 @@ +use crate::future::poll_fn; +use crate::io::PollEvented; +use crate::net::tcp::{Incoming, TcpStream}; +use crate::net::ToSocketAddrs; + +use std::convert::TryFrom; +use std::fmt; +use std::io; +use std::net::{self, SocketAddr}; +use std::task::{Context, Poll}; + +cfg_tcp! { + /// A TCP socket server, listening for connections. + /// + /// You can accept a new connection by using the [`accept`](`TcpListener::accept`) method. Alternatively `TcpListener` + /// implements the [`Stream`](`crate::stream::Stream`) trait, which allows you to use the listener in places that want a + /// stream. The stream will never return `None` and will also not yield the peer's `SocketAddr` structure. Iterating over + /// it is equivalent to calling accept in a loop. + /// + /// # Errors + /// + /// Note that accepting a connection can lead to various errors and not all + /// of them are necessarily fatal ‒ for example having too many open file + /// descriptors or the other side closing the connection while it waits in + /// an accept queue. These would terminate the stream if not handled in any + /// way. + /// + /// # Examples + /// + /// Using `accept`: + /// ```no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// + /// async fn process_socket<T>(socket: T) { + /// # drop(socket); + /// // do work with socket here + /// } + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// loop { + /// let (socket, _) = listener.accept().await?; + /// process_socket(socket).await; + /// } + /// } + /// ``` + /// + /// Using `impl Stream`: + /// ```no_run + /// use tokio::{net::TcpListener, stream::StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + /// while let Some(stream) = listener.next().await { + /// match stream { + /// Ok(stream) => { + /// println!("new client!"); + /// } + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// } + /// ``` + pub struct TcpListener { + io: PollEvented<mio::net::TcpListener>, + } +} + +impl TcpListener { + /// Creates a new TcpListener which will be bound to the specified address. + /// + /// The returned listener is ready for accepting connections. + /// + /// Binding with a port number of 0 will request that the OS assigns a port + /// to this listener. The port allocated can be queried via the `local_addr` + /// method. + /// + /// The address type can be any implementor of `ToSocketAddrs` trait. + /// + /// If `addr` yields multiple addresses, bind will be attempted with each of + /// the addresses until one succeeds and returns the listener. If none of + /// the addresses succeed in creating a listener, the error returned from + /// the last attempt (the last address) is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// + /// // use the listener + /// + /// # let _ = listener; + /// Ok(()) + /// } + /// ``` + pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> { + let addrs = addr.to_socket_addrs().await?; + + let mut last_err = None; + + for addr in addrs { + match TcpListener::bind_addr(addr) { + Ok(listener) => return Ok(listener), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any address", + ) + })) + } + + fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> { + let listener = mio::net::TcpListener::bind(&addr)?; + TcpListener::new(listener) + } + + /// Accepts a new incoming connection from this listener. + /// + /// This function will yield once a new TCP connection is established. When + /// established, the corresponding [`TcpStream`] and the remote peer's + /// address will be returned. + /// + /// [`TcpStream`]: ../struct.TcpStream.html + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// match listener.accept().await { + /// Ok((_socket, addr)) => println!("new client: {:?}", addr), + /// Err(e) => println!("couldn't get client: {:?}", e), + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { + poll_fn(|cx| self.poll_accept(cx)).await + } + + /// Attempts to poll `SocketAddr` and `TcpStream` bound to this address. + /// + /// In case if I/O resource isn't ready yet, `Poll::Pending` is returned and + /// current task will be notified by a waker. + pub fn poll_accept( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<io::Result<(TcpStream, SocketAddr)>> { + let (io, addr) = ready!(self.poll_accept_std(cx))?; + + let io = mio::net::TcpStream::from_stream(io)?; + let io = TcpStream::new(io)?; + + Poll::Ready(Ok((io, addr))) + } + + fn poll_accept_std( + &mut self, + cx: &mut Context<'_>, + ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().accept_std() { + Ok(pair) => Poll::Ready(Ok(pair)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } + + /// Creates a new TCP listener from the standard library's TCP listener. + /// + /// This method can be used when the `Handle::tcp_listen` method isn't + /// sufficient because perhaps some more configuration is needed in terms of + /// before the calls to `bind` and `listen`. + /// + /// This API is typically paired with the `net2` crate and the `TcpBuilder` + /// type to build up and customize a listener before it's shipped off to the + /// backing event loop. This allows configuration of options like + /// `SO_REUSEPORT`, binding to multiple addresses, etc. + /// + /// The `addr` argument here is one of the addresses that `listener` is + /// bound to and the listener will only be guaranteed to accept connections + /// of the same address type currently. + /// + /// The platform specific behavior of this function looks like: + /// + /// * On Unix, the socket is placed into nonblocking mode and connections + /// can be accepted as normal + /// + /// * On Windows, the address is stored internally and all future accepts + /// will only be for the same IP version as `addr` specified. That is, if + /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as + /// well (same for IPv6). + /// + /// # Examples + /// + /// ```rust,no_run + /// use std::error::Error; + /// use tokio::net::TcpListener; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?; + /// let listener = TcpListener::from_std(std_listener)?; + /// Ok(()) + /// } + /// ``` + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> { + let io = mio::net::TcpListener::from_std(listener)?; + let io = PollEvented::new(io)?; + Ok(TcpListener { io }) + } + + fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> { + let io = PollEvented::new(listener)?; + Ok(TcpListener { io }) + } + + /// Returns the local address that this listener is bound to. + /// + /// This can be useful, for example, when binding to port 0 to figure out + /// which port was actually bound. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// + /// assert_eq!(listener.local_addr()?, + /// SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))); + /// + /// Ok(()) + /// } + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns a stream over the connections being received on this listener. + /// + /// Note that `TcpListener` also directly implements `Stream`. + /// + /// The returned stream will never return `None` and will also not yield the + /// peer's `SocketAddr` structure. Iterating over it is equivalent to + /// calling accept in a loop. + /// + /// # Errors + /// + /// Note that accepting a connection can lead to various errors and not all + /// of them are necessarily fatal ‒ for example having too many open file + /// descriptors or the other side closing the connection while it waits in + /// an accept queue. These would terminate the stream if not handled in any + /// way. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::{net::TcpListener, stream::StreamExt}; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// match stream { + /// Ok(stream) => { + /// println!("new client!"); + /// } + /// Err(e) => { /* connection failed */ } + /// } + /// } + /// } + /// ``` + pub fn incoming(&mut self) -> Incoming<'_> { + Incoming::new(self) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: #method.set_ttl + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// + /// listener.set_ttl(100).expect("could not set TTL"); + /// assert_eq!(listener.ttl()?, 100); + /// + /// Ok(()) + /// } + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpListener; + /// + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// + /// listener.set_ttl(100).expect("could not set TTL"); + /// + /// Ok(()) + /// } + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } +} + +#[cfg(feature = "stream")] +impl crate::stream::Stream for TcpListener { + type Item = io::Result<TcpStream>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + let (socket, _) = ready!(self.poll_accept(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + +impl TryFrom<TcpListener> for mio::net::TcpListener { + type Error = io::Error; + + /// Consumes value, returning the mio I/O object. + /// + /// See [`PollEvented::into_inner`] for more details about + /// resource deregistration that happens during the call. + /// + /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner + fn try_from(value: TcpListener) -> Result<Self, Self::Error> { + value.io.into_inner() + } +} + +impl TryFrom<net::TcpListener> for TcpListener { + type Error = io::Error; + + /// Consumes stream, returning the tokio I/O object. + /// + /// This is equivalent to + /// [`TcpListener::from_std(stream)`](TcpListener::from_std). + fn try_from(stream: net::TcpListener) -> Result<Self, Self::Error> { + Self::from_std(stream) + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(unix)] +mod sys { + use super::TcpListener; + use std::os::unix::prelude::*; + + impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::{TcpListener; + // + // impl AsRawHandle for TcpListener { + // fn as_raw_handle(&self) -> RawHandle { + // self.listener.io().as_raw_handle() + // } + // } +} diff --git a/third_party/rust/tokio/src/net/tcp/mod.rs b/third_party/rust/tokio/src/net/tcp/mod.rs new file mode 100644 index 0000000000..d5354b38d2 --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/mod.rs @@ -0,0 +1,13 @@ +//! TCP utility types + +pub(crate) mod listener; +pub(crate) use listener::TcpListener; + +mod incoming; +pub use incoming::Incoming; + +mod split; +pub use split::{ReadHalf, WriteHalf}; + +pub(crate) mod stream; +pub(crate) use stream::TcpStream; diff --git a/third_party/rust/tokio/src/net/tcp/split.rs b/third_party/rust/tokio/src/net/tcp/split.rs new file mode 100644 index 0000000000..cce50f6ab3 --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/split.rs @@ -0,0 +1,163 @@ +//! `TcpStream` split support. +//! +//! A `TcpStream` can be split into a `ReadHalf` and a +//! `WriteHalf` with the `TcpStream::split` method. `ReadHalf` +//! implements `AsyncRead` while `WriteHalf` implements `AsyncWrite`. +//! +//! Compared to the generic split of `AsyncRead + AsyncWrite`, this specialized +//! split has no associated overhead and enforces all invariants at the type +//! level. + +use crate::future::poll_fn; +use crate::io::{AsyncRead, AsyncWrite}; +use crate::net::TcpStream; + +use bytes::Buf; +use std::io; +use std::mem::MaybeUninit; +use std::net::Shutdown; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Read half of a `TcpStream`. +#[derive(Debug)] +pub struct ReadHalf<'a>(&'a TcpStream); + +/// Write half of a `TcpStream`. +/// +/// Note that in the `AsyncWrite` implemenation of `TcpStreamWriteHalf`, +/// `poll_shutdown` actually shuts down the TCP stream in the write direction. +#[derive(Debug)] +pub struct WriteHalf<'a>(&'a TcpStream); + +pub(crate) fn split(stream: &mut TcpStream) -> (ReadHalf<'_>, WriteHalf<'_>) { + (ReadHalf(&*stream), WriteHalf(&*stream)) +} + +impl ReadHalf<'_> { + /// Attempt to receive data on the socket, without removing that data from + /// the queue, registering the current task for wakeup if data is not yet + /// available. + /// + /// See the [`TcpStream::poll_peek`] level documenation for more details. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::TcpStream; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; + /// let (mut read_half, _) = stream.split(); + /// let mut buf = [0; 10]; + /// + /// poll_fn(|cx| { + /// read_half.poll_peek(cx, &mut buf) + /// }).await?; + /// + /// Ok(()) + /// } + /// ``` + /// + /// [`TcpStream::poll_peek`]: TcpStream::poll_peek + pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + self.0.poll_peek2(cx, buf) + } + + /// Receives data on the socket from the remote address to which it is + /// connected, without removing that data from the queue. On success, + /// returns the number of bytes peeked. + /// + /// See the [`TcpStream::peek`] level documenation for more details. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// let (mut read_half, _) = stream.split(); + /// + /// let mut b1 = [0; 10]; + /// let mut b2 = [0; 10]; + /// + /// // Peek at the data + /// let n = read_half.peek(&mut b1).await?; + /// + /// // Read the data + /// assert_eq!(n, read_half.read(&mut b2[..n]).await?); + /// assert_eq!(&b1[..n], &b2[..n]); + /// + /// Ok(()) + /// } + /// ``` + /// + /// [`TcpStream::peek`]: TcpStream::peek + pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.poll_peek(cx, buf)).await + } +} + +impl AsyncRead for ReadHalf<'_> { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_read_priv(cx, buf) + } +} + +impl AsyncWrite for WriteHalf<'_> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.0.poll_write_priv(cx, buf) + } + + fn poll_write_buf<B: Buf>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.0.poll_write_buf_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // tcp flush is a no-op + Poll::Ready(Ok(())) + } + + // `poll_shutdown` on a write half shutdowns the stream in the "write" direction. + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.0.shutdown(Shutdown::Write).into() + } +} + +impl AsRef<TcpStream> for ReadHalf<'_> { + fn as_ref(&self) -> &TcpStream { + self.0 + } +} + +impl AsRef<TcpStream> for WriteHalf<'_> { + fn as_ref(&self) -> &TcpStream { + self.0 + } +} diff --git a/third_party/rust/tokio/src/net/tcp/stream.rs b/third_party/rust/tokio/src/net/tcp/stream.rs new file mode 100644 index 0000000000..732c0ca381 --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/stream.rs @@ -0,0 +1,869 @@ +use crate::future::poll_fn; +use crate::io::{AsyncRead, AsyncWrite, PollEvented}; +use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; +use crate::net::ToSocketAddrs; + +use bytes::Buf; +use iovec::IoVec; +use std::convert::TryFrom; +use std::fmt; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::net::{self, Shutdown, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +cfg_tcp! { + /// A TCP stream between a local and a remote socket. + /// + /// A TCP stream can either be created by connecting to an endpoint, via the + /// [`connect`] method, or by [accepting] a connection from a [listener]. + /// + /// [`connect`]: method@TcpStream::connect + /// [accepting]: method@super::TcpListener::accept + /// [listener]: struct@super::TcpListener + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + pub struct TcpStream { + io: PollEvented<mio::net::TcpStream>, + } +} + +impl TcpStream { + /// Opens a TCP connection to a remote host. + /// + /// `addr` is an address of the remote host. Anything which implements + /// `ToSocketAddrs` trait can be supplied for the address. + /// + /// If `addr` yields multiple addresses, connect will be attempted with each + /// of the addresses until a connection is successful. If none of the + /// addresses result in a successful connection, the error returned from the + /// last connection attempt (the last address) is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> { + let addrs = addr.to_socket_addrs().await?; + + let mut last_err = None; + + for addr in addrs { + match TcpStream::connect_addr(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any address", + ) + })) + } + + /// Establishes a connection to the specified `addr`. + async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> { + let sys = mio::net::TcpStream::connect(&addr)?; + let stream = TcpStream::new(sys)?; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + + Ok(stream) + } + + pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> { + let io = PollEvented::new(connected)?; + Ok(TcpStream { io }) + } + + /// Creates new `TcpStream` from a `std::net::TcpStream`. + /// + /// This function will convert a TCP stream created by the standard library + /// to a TCP stream ready to be used with the provided event loop handle. + /// + /// # Examples + /// + /// ```rust,no_run + /// use std::error::Error; + /// use tokio::net::TcpStream; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; + /// let stream = TcpStream::from_std(std_stream)?; + /// Ok(()) + /// } + /// ``` + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> { + let io = mio::net::TcpStream::from_stream(stream)?; + let io = PollEvented::new(io)?; + Ok(TcpStream { io }) + } + + // Connects `TcpStream` asynchronously that may be built with a net2 `TcpBuilder`. + // + // This should be removed in favor of some in-crate TcpSocket builder API. + #[doc(hidden)] + pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> { + let io = mio::net::TcpStream::connect_stream(stream, addr)?; + let io = PollEvented::new(io)?; + let stream = TcpStream { io }; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + + Ok(stream) + } + + /// Returns the local address that this stream is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.local_addr()?); + /// # Ok(()) + /// # } + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the remote address that this stream is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.peer_addr()?); + /// # Ok(()) + /// # } + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + /// Attempts to receive data on the socket, without removing that data from + /// the queue, registering the current task for wakeup if data is not yet + /// available. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if data is not yet available. + /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::TcpStream; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; + /// let mut buf = [0; 10]; + /// + /// poll_fn(|cx| { + /// stream.poll_peek(cx, &mut buf) + /// }).await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + self.poll_peek2(cx, buf) + } + + pub(super) fn poll_peek2( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().peek(buf) { + Ok(ret) => Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } + + /// Receives data on the socket from the remote address to which it is + /// connected, without removing that data from the queue. On success, + /// returns the number of bytes peeked. + /// + /// Successive calls return the same data. This is accomplished by passing + /// `MSG_PEEK` as a flag to the underlying recv system call. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut b1 = [0; 10]; + /// let mut b2 = [0; 10]; + /// + /// // Peek at the data + /// let n = stream.peek(&mut b1).await?; + /// + /// // Read the data + /// assert_eq!(n, stream.read(&mut b2[..n]).await?); + /// assert_eq!(&b1[..n], &b2[..n]); + /// + /// Ok(()) + /// } + /// ``` + pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.poll_peek(cx, buf)).await + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O on the specified + /// portions to return immediately with an appropriate value (see the + /// documentation of `Shutdown`). + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::net::Shutdown; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Shutdown the stream + /// stream.shutdown(Shutdown::Write)?; + /// + /// Ok(()) + /// } + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } + + /// Gets the value of the `TCP_NODELAY` option on this socket. + /// + /// For more information about this option, see [`set_nodelay`]. + /// + /// [`set_nodelay`]: TcpStream::set_nodelay + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.nodelay()?); + /// # Ok(()) + /// # } + /// ``` + pub fn nodelay(&self) -> io::Result<bool> { + self.io.get_ref().nodelay() + } + + /// Sets the value of the `TCP_NODELAY` option on this socket. + /// + /// If set, this option disables the Nagle algorithm. This means that + /// segments are always sent as soon as possible, even if there is only a + /// small amount of data. When not set, data is buffered until there is a + /// sufficient amount to send out, thereby avoiding the frequent sending of + /// small packets. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_nodelay(true)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.io.get_ref().set_nodelay(nodelay) + } + + /// Gets the value of the `SO_RCVBUF` option on this socket. + /// + /// For more information about this option, see [`set_recv_buffer_size`]. + /// + /// [`set_recv_buffer_size`]: TcpStream::set_recv_buffer_size + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.recv_buffer_size()?); + /// # Ok(()) + /// # } + /// ``` + pub fn recv_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().recv_buffer_size() + } + + /// Sets the value of the `SO_RCVBUF` option on this socket. + /// + /// Changes the size of the operating system's receive buffer associated + /// with the socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_recv_buffer_size(100)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_recv_buffer_size(size) + } + + /// Gets the value of the `SO_SNDBUF` option on this socket. + /// + /// For more information about this option, see [`set_send_buffer_size`]. + /// + /// [`set_send_buffer_size`]: TcpStream::set_send_buffer_size + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.send_buffer_size()?); + /// # Ok(()) + /// # } + /// ``` + pub fn send_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().send_buffer_size() + } + + /// Sets the value of the `SO_SNDBUF` option on this socket. + /// + /// Changes the size of the operating system's send buffer associated with + /// the socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_send_buffer_size(100)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_send_buffer_size(size) + } + + /// Returns whether keepalive messages are enabled on this socket, and if so + /// the duration of time between them. + /// + /// For more information about this option, see [`set_keepalive`]. + /// + /// [`set_keepalive`]: TcpStream::set_keepalive + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.keepalive()?); + /// # Ok(()) + /// # } + /// ``` + pub fn keepalive(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().keepalive() + } + + /// Sets whether keepalive messages are enabled to be sent on this socket. + /// + /// On Unix, this option will set the `SO_KEEPALIVE` as well as the + /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). + /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. + /// + /// If `None` is specified then keepalive messages are disabled, otherwise + /// the duration specified will be the time to remain idle before sending a + /// TCP keepalive probe. + /// + /// Some platforms specify this value in seconds, so sub-second + /// specifications may be omitted. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_keepalive(None)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_keepalive(keepalive) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: TcpStream::set_ttl + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.ttl()?); + /// # Ok(()) + /// # } + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_ttl(123)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Reads the linger duration for this socket by getting the `SO_LINGER` + /// option. + /// + /// For more information about this option, see [`set_linger`]. + /// + /// [`set_linger`]: TcpStream::set_linger + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.linger()?); + /// # Ok(()) + /// # } + /// ``` + pub fn linger(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().linger() + } + + /// Sets the linger duration of this socket by setting the `SO_LINGER` + /// option. + /// + /// This option controls the action taken when a stream has unsent messages + /// and the stream is closed. If `SO_LINGER` is set, the system + /// shall block the process until it can transmit the data or until the + /// time expires. + /// + /// If `SO_LINGER` is not specified, and the stream is closed, the system + /// handles the call in a way that allows the process to continue as quickly + /// as possible. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_linger(None)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_linger(dur) + } + + /// Splits a `TcpStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + split(self) + } + + // == Poll IO functions that takes `&self` == + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. + + pub(crate) fn poll_read_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + pub(super) fn poll_write_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + pub(super) fn poll_write_buf_priv<B: Buf>( + &self, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + use std::io::IoSlice; + + ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); + } + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + Poll::Ready(Ok(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } +} + +impl TryFrom<TcpStream> for mio::net::TcpStream { + type Error = io::Error; + + /// Consumes value, returning the mio I/O object. + /// + /// See [`PollEvented::into_inner`] for more details about + /// resource deregistration that happens during the call. + /// + /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner + fn try_from(value: TcpStream) -> Result<Self, Self::Error> { + value.io.into_inner() + } +} + +impl TryFrom<net::TcpStream> for TcpStream { + type Error = io::Error; + + /// Consumes stream, returning the tokio I/O object. + /// + /// This is equivalent to + /// [`TcpStream::from_std(stream)`](TcpStream::from_std). + fn try_from(stream: net::TcpStream) -> Result<Self, Self::Error> { + Self::from_std(stream) + } +} + +// ===== impl Read / Write ===== + +impl AsyncRead for TcpStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.poll_read_priv(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.poll_write_priv(cx, buf) + } + + fn poll_write_buf<B: Buf>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.poll_write_buf_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // tcp flush is a no-op + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.shutdown(std::net::Shutdown::Write)?; + Poll::Ready(Ok(())) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(unix)] +mod sys { + use super::TcpStream; + use std::os::unix::prelude::*; + + impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::TcpStream; + // + // impl AsRawHandle for TcpStream { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +} |