use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, PollEvented}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::ToSocketAddrs; use bytes::Buf; use iovec::IoVec; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; use std::mem::MaybeUninit; use std::net::{self, Shutdown, SocketAddr}; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; cfg_tcp! { /// A TCP stream between a local and a remote socket. /// /// A TCP stream can either be created by connecting to an endpoint, via the /// [`connect`] method, or by [accepting] a connection from a [listener]. /// /// [`connect`]: method@TcpStream::connect /// [accepting]: method@super::TcpListener::accept /// [listener]: struct@super::TcpListener /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// use tokio::prelude::*; /// use std::error::Error; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// // Connect to a peer /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Write some data. /// stream.write_all(b"hello world!").await?; /// /// Ok(()) /// } /// ``` pub struct TcpStream { io: PollEvented, } } impl TcpStream { /// Opens a TCP connection to a remote host. /// /// `addr` is an address of the remote host. Anything which implements /// `ToSocketAddrs` trait can be supplied for the address. /// /// If `addr` yields multiple addresses, connect will be attempted with each /// of the addresses until a connection is successful. If none of the /// addresses result in a successful connection, the error returned from the /// last connection attempt (the last address) is returned. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// use tokio::prelude::*; /// use std::error::Error; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// // Connect to a peer /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Write some data. /// stream.write_all(b"hello world!").await?; /// /// Ok(()) /// } /// ``` pub async fn connect(addr: A) -> io::Result { let addrs = addr.to_socket_addrs().await?; let mut last_err = None; for addr in addrs { match TcpStream::connect_addr(addr).await { Ok(stream) => return Ok(stream), Err(e) => last_err = Some(e), } } Err(last_err.unwrap_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, "could not resolve to any address", ) })) } /// Establishes a connection to the specified `addr`. async fn connect_addr(addr: SocketAddr) -> io::Result { let sys = mio::net::TcpStream::connect(&addr)?; let stream = TcpStream::new(sys)?; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're // writable we check for `take_socket_error` to see if the connect // actually hit an error or not. // // If all that succeeded then we ship everything on up. poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; if let Some(e) = stream.io.get_ref().take_error()? { return Err(e); } Ok(stream) } pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { let io = PollEvented::new(connected)?; Ok(TcpStream { io }) } /// Creates new `TcpStream` from a `std::net::TcpStream`. /// /// This function will convert a TCP stream created by the standard library /// to a TCP stream ready to be used with the provided event loop handle. /// /// # Examples /// /// ```rust,no_run /// use std::error::Error; /// use tokio::net::TcpStream; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; /// let stream = TcpStream::from_std(std_stream)?; /// Ok(()) /// } /// ``` /// /// # Panics /// /// This function panics if thread-local runtime is not set. /// /// The runtime is usually set implicitly when this function is called /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. pub fn from_std(stream: net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_stream(stream)?; let io = PollEvented::new(io)?; Ok(TcpStream { io }) } // Connects `TcpStream` asynchronously that may be built with a net2 `TcpBuilder`. // // This should be removed in favor of some in-crate TcpSocket builder API. #[doc(hidden)] pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result { let io = mio::net::TcpStream::connect_stream(stream, addr)?; let io = PollEvented::new(io)?; let stream = TcpStream { io }; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're // writable we check for `take_socket_error` to see if the connect // actually hit an error or not. // // If all that succeeded then we ship everything on up. poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; if let Some(e) = stream.io.get_ref().take_error()? { return Err(e); } Ok(stream) } /// Returns the local address that this stream is bound to. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.local_addr()?); /// # Ok(()) /// # } /// ``` pub fn local_addr(&self) -> io::Result { self.io.get_ref().local_addr() } /// Returns the remote address that this stream is connected to. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.peer_addr()?); /// # Ok(()) /// # } /// ``` pub fn peer_addr(&self) -> io::Result { self.io.get_ref().peer_addr() } /// Attempts to receive data on the socket, without removing that data from /// the queue, registering the current task for wakeup if data is not yet /// available. /// /// # Return value /// /// The function returns: /// /// * `Poll::Pending` if data is not yet available. /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. /// * `Poll::Ready(Err(e))` if an error is encountered. /// /// # Errors /// /// This function may encounter any standard I/O error except `WouldBlock`. /// /// # Examples /// /// ```no_run /// use tokio::io; /// use tokio::net::TcpStream; /// /// use futures::future::poll_fn; /// /// #[tokio::main] /// async fn main() -> io::Result<()> { /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; /// let mut buf = [0; 10]; /// /// poll_fn(|cx| { /// stream.poll_peek(cx, &mut buf) /// }).await?; /// /// Ok(()) /// } /// ``` pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.poll_peek2(cx, buf) } pub(super) fn poll_peek2( &self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().peek(buf) { Ok(ret) => Poll::Ready(Ok(ret)), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_read_ready(cx, mio::Ready::readable())?; Poll::Pending } Err(e) => Poll::Ready(Err(e)), } } /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. /// /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// use tokio::prelude::*; /// use std::error::Error; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// // Connect to a peer /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// let mut b1 = [0; 10]; /// let mut b2 = [0; 10]; /// /// // Peek at the data /// let n = stream.peek(&mut b1).await?; /// /// // Read the data /// assert_eq!(n, stream.read(&mut b2[..n]).await?); /// assert_eq!(&b1[..n], &b2[..n]); /// /// Ok(()) /// } /// ``` pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result { poll_fn(|cx| self.poll_peek(cx, buf)).await } /// Shuts down the read, write, or both halves of this connection. /// /// This function will cause all pending and future I/O on the specified /// portions to return immediately with an appropriate value (see the /// documentation of `Shutdown`). /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// use std::error::Error; /// use std::net::Shutdown; /// /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// // Connect to a peer /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// // Shutdown the stream /// stream.shutdown(Shutdown::Write)?; /// /// Ok(()) /// } /// ``` pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { self.io.get_ref().shutdown(how) } /// Gets the value of the `TCP_NODELAY` option on this socket. /// /// For more information about this option, see [`set_nodelay`]. /// /// [`set_nodelay`]: TcpStream::set_nodelay /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.nodelay()?); /// # Ok(()) /// # } /// ``` pub fn nodelay(&self) -> io::Result { self.io.get_ref().nodelay() } /// Sets the value of the `TCP_NODELAY` option on this socket. /// /// If set, this option disables the Nagle algorithm. This means that /// segments are always sent as soon as possible, even if there is only a /// small amount of data. When not set, data is buffered until there is a /// sufficient amount to send out, thereby avoiding the frequent sending of /// small packets. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_nodelay(true)?; /// # Ok(()) /// # } /// ``` pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { self.io.get_ref().set_nodelay(nodelay) } /// Gets the value of the `SO_RCVBUF` option on this socket. /// /// For more information about this option, see [`set_recv_buffer_size`]. /// /// [`set_recv_buffer_size`]: TcpStream::set_recv_buffer_size /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.recv_buffer_size()?); /// # Ok(()) /// # } /// ``` pub fn recv_buffer_size(&self) -> io::Result { self.io.get_ref().recv_buffer_size() } /// Sets the value of the `SO_RCVBUF` option on this socket. /// /// Changes the size of the operating system's receive buffer associated /// with the socket. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_recv_buffer_size(100)?; /// # Ok(()) /// # } /// ``` pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { self.io.get_ref().set_recv_buffer_size(size) } /// Gets the value of the `SO_SNDBUF` option on this socket. /// /// For more information about this option, see [`set_send_buffer_size`]. /// /// [`set_send_buffer_size`]: TcpStream::set_send_buffer_size /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.send_buffer_size()?); /// # Ok(()) /// # } /// ``` pub fn send_buffer_size(&self) -> io::Result { self.io.get_ref().send_buffer_size() } /// Sets the value of the `SO_SNDBUF` option on this socket. /// /// Changes the size of the operating system's send buffer associated with /// the socket. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_send_buffer_size(100)?; /// # Ok(()) /// # } /// ``` pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { self.io.get_ref().set_send_buffer_size(size) } /// Returns whether keepalive messages are enabled on this socket, and if so /// the duration of time between them. /// /// For more information about this option, see [`set_keepalive`]. /// /// [`set_keepalive`]: TcpStream::set_keepalive /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.keepalive()?); /// # Ok(()) /// # } /// ``` pub fn keepalive(&self) -> io::Result> { self.io.get_ref().keepalive() } /// Sets whether keepalive messages are enabled to be sent on this socket. /// /// On Unix, this option will set the `SO_KEEPALIVE` as well as the /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. /// /// If `None` is specified then keepalive messages are disabled, otherwise /// the duration specified will be the time to remain idle before sending a /// TCP keepalive probe. /// /// Some platforms specify this value in seconds, so sub-second /// specifications may be omitted. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_keepalive(None)?; /// # Ok(()) /// # } /// ``` pub fn set_keepalive(&self, keepalive: Option) -> io::Result<()> { self.io.get_ref().set_keepalive(keepalive) } /// Gets the value of the `IP_TTL` option for this socket. /// /// For more information about this option, see [`set_ttl`]. /// /// [`set_ttl`]: TcpStream::set_ttl /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.ttl()?); /// # Ok(()) /// # } /// ``` pub fn ttl(&self) -> io::Result { self.io.get_ref().ttl() } /// Sets the value for the `IP_TTL` option on this socket. /// /// This value sets the time-to-live field that is used in every packet sent /// from this socket. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_ttl(123)?; /// # Ok(()) /// # } /// ``` pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { self.io.get_ref().set_ttl(ttl) } /// Reads the linger duration for this socket by getting the `SO_LINGER` /// option. /// /// For more information about this option, see [`set_linger`]. /// /// [`set_linger`]: TcpStream::set_linger /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// println!("{:?}", stream.linger()?); /// # Ok(()) /// # } /// ``` pub fn linger(&self) -> io::Result> { self.io.get_ref().linger() } /// Sets the linger duration of this socket by setting the `SO_LINGER` /// option. /// /// This option controls the action taken when a stream has unsent messages /// and the stream is closed. If `SO_LINGER` is set, the system /// shall block the process until it can transmit the data or until the /// time expires. /// /// If `SO_LINGER` is not specified, and the stream is closed, the system /// handles the call in a way that allows the process to continue as quickly /// as possible. /// /// # Examples /// /// ```no_run /// use tokio::net::TcpStream; /// /// # async fn dox() -> Result<(), Box> { /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// /// stream.set_linger(None)?; /// # Ok(()) /// # } /// ``` pub fn set_linger(&self, dur: Option) -> io::Result<()> { self.io.get_ref().set_linger(dur) } /// Splits a `TcpStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { split(self) } // == Poll IO functions that takes `&self` == // // They are not public because (taken from the doc of `PollEvented`): // // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the // caller must ensure that there are at most two tasks that use a // `PollEvented` instance concurrently. One for reading and one for writing. // While violating this requirement is "safe" from a Rust memory model point // of view, it will result in unexpected behavior in the form of lost // notifications and tasks hanging. pub(crate) fn poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; match self.io.get_ref().read(buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_read_ready(cx, mio::Ready::readable())?; Poll::Pending } x => Poll::Ready(x), } } pub(super) fn poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { ready!(self.io.poll_write_ready(cx))?; match self.io.get_ref().write(buf) { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_write_ready(cx)?; Poll::Pending } x => Poll::Ready(x), } } pub(super) fn poll_write_buf_priv( &self, cx: &mut Context<'_>, buf: &mut B, ) -> Poll> { use std::io::IoSlice; ready!(self.io.poll_write_ready(cx))?; // The `IoVec` (v0.1.x) type can't have a zero-length size, so create // a dummy version from a 1-length slice which we'll overwrite with // the `bytes_vectored` method. static S: &[u8] = &[0]; const MAX_BUFS: usize = 64; // IoSlice isn't Copy, so we must expand this manually ;_; let mut slices: [IoSlice<'_>; MAX_BUFS] = [ IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), IoSlice::new(S), ]; let cnt = buf.bytes_vectored(&mut slices); let iovec = <&IoVec>::from(S); let mut vecs = [iovec; MAX_BUFS]; for i in 0..cnt { vecs[i] = (*slices[i]).into(); } match self.io.get_ref().write_bufs(&vecs[..cnt]) { Ok(n) => { buf.advance(n); Poll::Ready(Ok(n)) } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { self.io.clear_write_ready(cx)?; Poll::Pending } Err(e) => Poll::Ready(Err(e)), } } } impl TryFrom for mio::net::TcpStream { type Error = io::Error; /// Consumes value, returning the mio I/O object. /// /// See [`PollEvented::into_inner`] for more details about /// resource deregistration that happens during the call. /// /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner fn try_from(value: TcpStream) -> Result { value.io.into_inner() } } impl TryFrom for TcpStream { type Error = io::Error; /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to /// [`TcpStream::from_std(stream)`](TcpStream::from_std). fn try_from(stream: net::TcpStream) -> Result { Self::from_std(stream) } } // ===== impl Read / Write ===== impl AsyncRead for TcpStream { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { false } fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { self.poll_read_priv(cx, buf) } } impl AsyncWrite for TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.poll_write_priv(cx, buf) } fn poll_write_buf( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut B, ) -> Poll> { self.poll_write_buf_priv(cx, buf) } #[inline] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { // tcp flush is a no-op Poll::Ready(Ok(())) } fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { self.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } } impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.io.get_ref().fmt(f) } } #[cfg(unix)] mod sys { use super::TcpStream; use std::os::unix::prelude::*; impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { self.io.get_ref().as_raw_fd() } } } #[cfg(windows)] mod sys { // TODO: let's land these upstream with mio and then we can add them here. // // use std::os::windows::prelude::*; // use super::TcpStream; // // impl AsRawHandle for TcpStream { // fn as_raw_handle(&self) -> RawHandle { // self.io.get_ref().as_raw_handle() // } // } }