diff options
Diffstat (limited to 'third_party/rust/tokio/src/net/tcp/stream.rs')
-rw-r--r-- | third_party/rust/tokio/src/net/tcp/stream.rs | 869 |
1 files changed, 869 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/net/tcp/stream.rs b/third_party/rust/tokio/src/net/tcp/stream.rs new file mode 100644 index 0000000000..732c0ca381 --- /dev/null +++ b/third_party/rust/tokio/src/net/tcp/stream.rs @@ -0,0 +1,869 @@ +use crate::future::poll_fn; +use crate::io::{AsyncRead, AsyncWrite, PollEvented}; +use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; +use crate::net::ToSocketAddrs; + +use bytes::Buf; +use iovec::IoVec; +use std::convert::TryFrom; +use std::fmt; +use std::io::{self, Read, Write}; +use std::mem::MaybeUninit; +use std::net::{self, Shutdown, SocketAddr}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +cfg_tcp! { + /// A TCP stream between a local and a remote socket. + /// + /// A TCP stream can either be created by connecting to an endpoint, via the + /// [`connect`] method, or by [accepting] a connection from a [listener]. + /// + /// [`connect`]: method@TcpStream::connect + /// [accepting]: method@super::TcpListener::accept + /// [listener]: struct@super::TcpListener + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + pub struct TcpStream { + io: PollEvented<mio::net::TcpStream>, + } +} + +impl TcpStream { + /// Opens a TCP connection to a remote host. + /// + /// `addr` is an address of the remote host. Anything which implements + /// `ToSocketAddrs` trait can be supplied for the address. + /// + /// If `addr` yields multiple addresses, connect will be attempted with each + /// of the addresses until a connection is successful. If none of the + /// addresses result in a successful connection, the error returned from the + /// last connection attempt (the last address) is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Write some data. + /// stream.write_all(b"hello world!").await?; + /// + /// Ok(()) + /// } + /// ``` + pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> { + let addrs = addr.to_socket_addrs().await?; + + let mut last_err = None; + + for addr in addrs { + match TcpStream::connect_addr(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any address", + ) + })) + } + + /// Establishes a connection to the specified `addr`. + async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> { + let sys = mio::net::TcpStream::connect(&addr)?; + let stream = TcpStream::new(sys)?; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + + Ok(stream) + } + + pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> { + let io = PollEvented::new(connected)?; + Ok(TcpStream { io }) + } + + /// Creates new `TcpStream` from a `std::net::TcpStream`. + /// + /// This function will convert a TCP stream created by the standard library + /// to a TCP stream ready to be used with the provided event loop handle. + /// + /// # Examples + /// + /// ```rust,no_run + /// use std::error::Error; + /// use tokio::net::TcpStream; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; + /// let stream = TcpStream::from_std(std_stream)?; + /// Ok(()) + /// } + /// ``` + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> { + let io = mio::net::TcpStream::from_stream(stream)?; + let io = PollEvented::new(io)?; + Ok(TcpStream { io }) + } + + // Connects `TcpStream` asynchronously that may be built with a net2 `TcpBuilder`. + // + // This should be removed in favor of some in-crate TcpSocket builder API. + #[doc(hidden)] + pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> { + let io = mio::net::TcpStream::connect_stream(stream, addr)?; + let io = PollEvented::new(io)?; + let stream = TcpStream { io }; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + + Ok(stream) + } + + /// Returns the local address that this stream is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.local_addr()?); + /// # Ok(()) + /// # } + /// ``` + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the remote address that this stream is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.peer_addr()?); + /// # Ok(()) + /// # } + /// ``` + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + /// Attempts to receive data on the socket, without removing that data from + /// the queue, registering the current task for wakeup if data is not yet + /// available. + /// + /// # Return value + /// + /// The function returns: + /// + /// * `Poll::Pending` if data is not yet available. + /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. + /// * `Poll::Ready(Err(e))` if an error is encountered. + /// + /// # Errors + /// + /// This function may encounter any standard I/O error except `WouldBlock`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::TcpStream; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; + /// let mut buf = [0; 10]; + /// + /// poll_fn(|cx| { + /// stream.poll_peek(cx, &mut buf) + /// }).await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { + self.poll_peek2(cx, buf) + } + + pub(super) fn poll_peek2( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().peek(buf) { + Ok(ret) => Poll::Ready(Ok(ret)), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } + + /// Receives data on the socket from the remote address to which it is + /// connected, without removing that data from the queue. On success, + /// returns the number of bytes peeked. + /// + /// Successive calls return the same data. This is accomplished by passing + /// `MSG_PEEK` as a flag to the underlying recv system call. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use tokio::prelude::*; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut b1 = [0; 10]; + /// let mut b2 = [0; 10]; + /// + /// // Peek at the data + /// let n = stream.peek(&mut b1).await?; + /// + /// // Read the data + /// assert_eq!(n, stream.read(&mut b2[..n]).await?); + /// assert_eq!(&b1[..n], &b2[..n]); + /// + /// Ok(()) + /// } + /// ``` + pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { + poll_fn(|cx| self.poll_peek(cx, buf)).await + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O on the specified + /// portions to return immediately with an appropriate value (see the + /// documentation of `Shutdown`). + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::net::Shutdown; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box<dyn Error>> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// // Shutdown the stream + /// stream.shutdown(Shutdown::Write)?; + /// + /// Ok(()) + /// } + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } + + /// Gets the value of the `TCP_NODELAY` option on this socket. + /// + /// For more information about this option, see [`set_nodelay`]. + /// + /// [`set_nodelay`]: TcpStream::set_nodelay + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.nodelay()?); + /// # Ok(()) + /// # } + /// ``` + pub fn nodelay(&self) -> io::Result<bool> { + self.io.get_ref().nodelay() + } + + /// Sets the value of the `TCP_NODELAY` option on this socket. + /// + /// If set, this option disables the Nagle algorithm. This means that + /// segments are always sent as soon as possible, even if there is only a + /// small amount of data. When not set, data is buffered until there is a + /// sufficient amount to send out, thereby avoiding the frequent sending of + /// small packets. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_nodelay(true)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.io.get_ref().set_nodelay(nodelay) + } + + /// Gets the value of the `SO_RCVBUF` option on this socket. + /// + /// For more information about this option, see [`set_recv_buffer_size`]. + /// + /// [`set_recv_buffer_size`]: TcpStream::set_recv_buffer_size + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.recv_buffer_size()?); + /// # Ok(()) + /// # } + /// ``` + pub fn recv_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().recv_buffer_size() + } + + /// Sets the value of the `SO_RCVBUF` option on this socket. + /// + /// Changes the size of the operating system's receive buffer associated + /// with the socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_recv_buffer_size(100)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_recv_buffer_size(size) + } + + /// Gets the value of the `SO_SNDBUF` option on this socket. + /// + /// For more information about this option, see [`set_send_buffer_size`]. + /// + /// [`set_send_buffer_size`]: TcpStream::set_send_buffer_size + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.send_buffer_size()?); + /// # Ok(()) + /// # } + /// ``` + pub fn send_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().send_buffer_size() + } + + /// Sets the value of the `SO_SNDBUF` option on this socket. + /// + /// Changes the size of the operating system's send buffer associated with + /// the socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_send_buffer_size(100)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_send_buffer_size(size) + } + + /// Returns whether keepalive messages are enabled on this socket, and if so + /// the duration of time between them. + /// + /// For more information about this option, see [`set_keepalive`]. + /// + /// [`set_keepalive`]: TcpStream::set_keepalive + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.keepalive()?); + /// # Ok(()) + /// # } + /// ``` + pub fn keepalive(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().keepalive() + } + + /// Sets whether keepalive messages are enabled to be sent on this socket. + /// + /// On Unix, this option will set the `SO_KEEPALIVE` as well as the + /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). + /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. + /// + /// If `None` is specified then keepalive messages are disabled, otherwise + /// the duration specified will be the time to remain idle before sending a + /// TCP keepalive probe. + /// + /// Some platforms specify this value in seconds, so sub-second + /// specifications may be omitted. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_keepalive(None)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_keepalive(keepalive) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: TcpStream::set_ttl + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.ttl()?); + /// # Ok(()) + /// # } + /// ``` + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_ttl(123)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Reads the linger duration for this socket by getting the `SO_LINGER` + /// option. + /// + /// For more information about this option, see [`set_linger`]. + /// + /// [`set_linger`]: TcpStream::set_linger + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// println!("{:?}", stream.linger()?); + /// # Ok(()) + /// # } + /// ``` + pub fn linger(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().linger() + } + + /// Sets the linger duration of this socket by setting the `SO_LINGER` + /// option. + /// + /// This option controls the action taken when a stream has unsent messages + /// and the stream is closed. If `SO_LINGER` is set, the system + /// shall block the process until it can transmit the data or until the + /// time expires. + /// + /// If `SO_LINGER` is not specified, and the stream is closed, the system + /// handles the call in a way that allows the process to continue as quickly + /// as possible. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// + /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// stream.set_linger(None)?; + /// # Ok(()) + /// # } + /// ``` + pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_linger(dur) + } + + /// Splits a `TcpStream` into a read half and a write half, which can be used + /// to read and write the stream concurrently. + pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + split(self) + } + + // == Poll IO functions that takes `&self` == + // + // They are not public because (taken from the doc of `PollEvented`): + // + // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the + // caller must ensure that there are at most two tasks that use a + // `PollEvented` instance concurrently. One for reading and one for writing. + // While violating this requirement is "safe" from a Rust memory model point + // of view, it will result in unexpected behavior in the form of lost + // notifications and tasks hanging. + + pub(crate) fn poll_read_priv( + &self, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + + match self.io.get_ref().read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(cx, mio::Ready::readable())?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + pub(super) fn poll_write_priv( + &self, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + ready!(self.io.poll_write_ready(cx))?; + + match self.io.get_ref().write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + x => Poll::Ready(x), + } + } + + pub(super) fn poll_write_buf_priv<B: Buf>( + &self, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + use std::io::IoSlice; + + ready!(self.io.poll_write_ready(cx))?; + + // The `IoVec` (v0.1.x) type can't have a zero-length size, so create + // a dummy version from a 1-length slice which we'll overwrite with + // the `bytes_vectored` method. + static S: &[u8] = &[0]; + const MAX_BUFS: usize = 64; + + // IoSlice isn't Copy, so we must expand this manually ;_; + let mut slices: [IoSlice<'_>; MAX_BUFS] = [ + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + IoSlice::new(S), + ]; + let cnt = buf.bytes_vectored(&mut slices); + + let iovec = <&IoVec>::from(S); + let mut vecs = [iovec; MAX_BUFS]; + for i in 0..cnt { + vecs[i] = (*slices[i]).into(); + } + + match self.io.get_ref().write_bufs(&vecs[..cnt]) { + Ok(n) => { + buf.advance(n); + Poll::Ready(Ok(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready(cx)?; + Poll::Pending + } + Err(e) => Poll::Ready(Err(e)), + } + } +} + +impl TryFrom<TcpStream> for mio::net::TcpStream { + type Error = io::Error; + + /// Consumes value, returning the mio I/O object. + /// + /// See [`PollEvented::into_inner`] for more details about + /// resource deregistration that happens during the call. + /// + /// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner + fn try_from(value: TcpStream) -> Result<Self, Self::Error> { + value.io.into_inner() + } +} + +impl TryFrom<net::TcpStream> for TcpStream { + type Error = io::Error; + + /// Consumes stream, returning the tokio I/O object. + /// + /// This is equivalent to + /// [`TcpStream::from_std(stream)`](TcpStream::from_std). + fn try_from(stream: net::TcpStream) -> Result<Self, Self::Error> { + Self::from_std(stream) + } +} + +// ===== impl Read / Write ===== + +impl AsyncRead for TcpStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit<u8>]) -> bool { + false + } + + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll<io::Result<usize>> { + self.poll_read_priv(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.poll_write_priv(cx, buf) + } + + fn poll_write_buf<B: Buf>( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll<io::Result<usize>> { + self.poll_write_buf_priv(cx, buf) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + // tcp flush is a no-op + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { + self.shutdown(std::net::Shutdown::Write)?; + Poll::Ready(Ok(())) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(unix)] +mod sys { + use super::TcpStream; + use std::os::unix::prelude::*; + + impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::TcpStream; + // + // impl AsRawHandle for TcpStream { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +} |