diff options
Diffstat (limited to 'third_party/rust/tokio-tcp')
-rw-r--r-- | third_party/rust/tokio-tcp/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/CHANGELOG.md | 7 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/Cargo.toml | 42 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/LICENSE | 25 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/README.md | 15 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/src/incoming.rs | 45 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/src/lib.rs | 59 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/src/listener.rs | 261 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/src/stream.rs | 755 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/chain.rs | 49 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/echo.rs | 51 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/limit.rs | 43 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/stream-buffered.rs | 54 | ||||
-rw-r--r-- | third_party/rust/tokio-tcp/tests/tcp.rs | 132 |
14 files changed, 1539 insertions, 0 deletions
diff --git a/third_party/rust/tokio-tcp/.cargo-checksum.json b/third_party/rust/tokio-tcp/.cargo-checksum.json new file mode 100644 index 0000000000..55146fc946 --- /dev/null +++ b/third_party/rust/tokio-tcp/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"3c998bcf9a586c04572939b0b2a13de8c9b209ea1e1f39985a68f93c15eb8ca6","Cargo.toml":"873a8b587ce1997708b10e2a04d3f4ad1547643bbaa03daa7b5dc9a44bcf6693","LICENSE":"4899c290472c872cf8a1904a60e73ec58a1bc1db2e20bc143aa3d1498be49c96","README.md":"2a2758b6ffb7c22a360e759c577ebb057191247dc13aa24acf2bb9c8094c78f4","src/incoming.rs":"aab273caab1ec1db0228e52cb6c69525912b2db8f43ff545ee91658c27fec1a2","src/lib.rs":"874211f0a17e7efc285e0a1108a377a3f9761abb9fccf473f80823dabf6474b3","src/listener.rs":"98ca8d5ef5836273fe29af73e90b884c7755d830c32258086950fcca96bdb28b","src/stream.rs":"af393e6fb88cb94567d59999f563b76d6665883c41d1808c52a21795b00cd3c1","tests/chain.rs":"fd1a8dc4e8d838bb201dcbae99bddf6093d5cedb0c21bf05e574b19924c051b1","tests/echo.rs":"dbe85496609f488257bf21f36253eaff3aaea08432fdfdc4f246edc8ca1586df","tests/limit.rs":"df5f6fbcceae7df613e83f541b8debed5f06ee42900c5462f4322bd0564ae118","tests/stream-buffered.rs":"1996ce7bc2664da310ea1417d0baef7bff5a20dc51dc2efead662b82e71f9d6b","tests/tcp.rs":"32217ca59e86c315af29ddb9e9746de4ba1ac32d32e0bc93ca2f877561bf1331"},"package":"5b4c329b47f071eb8a746040465fa751bd95e4716e98daef6a9b4e434c17d565"}
\ No newline at end of file diff --git a/third_party/rust/tokio-tcp/CHANGELOG.md b/third_party/rust/tokio-tcp/CHANGELOG.md new file mode 100644 index 0000000000..93605645e9 --- /dev/null +++ b/third_party/rust/tokio-tcp/CHANGELOG.md @@ -0,0 +1,7 @@ +# 0.1.1 (August 6, 2018) + +* Add `TcpStream::try_clone` (#448) + +# 0.1.0 (March 23, 2018) + +* Initial release diff --git a/third_party/rust/tokio-tcp/Cargo.toml b/third_party/rust/tokio-tcp/Cargo.toml new file mode 100644 index 0000000000..ee398dee34 --- /dev/null +++ b/third_party/rust/tokio-tcp/Cargo.toml @@ -0,0 +1,42 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g. crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio-tcp" +version = "0.1.1" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "TCP bindings for tokio.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-tcp/0.1" +categories = ["asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4" + +[dependencies.futures] +version = "0.1.19" + +[dependencies.iovec] +version = "0.1" + +[dependencies.mio] +version = "0.6.14" + +[dependencies.tokio-io] +version = "0.1.6" + +[dependencies.tokio-reactor] +version = "0.1.1" +[dev-dependencies.env_logger] +version = "0.4" +default-features = false diff --git a/third_party/rust/tokio-tcp/LICENSE b/third_party/rust/tokio-tcp/LICENSE new file mode 100644 index 0000000000..38c1e27b8e --- /dev/null +++ b/third_party/rust/tokio-tcp/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-tcp/README.md b/third_party/rust/tokio-tcp/README.md new file mode 100644 index 0000000000..9cfc177b80 --- /dev/null +++ b/third_party/rust/tokio-tcp/README.md @@ -0,0 +1,15 @@ +# tokio-tcp + +TCP bindings for `tokio`. + +[Documentation](https://tokio-rs.github.io/tokio/tokio_tcp/) + +## License + +This project is licensed under the [MIT license](./LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-tcp/src/incoming.rs b/third_party/rust/tokio-tcp/src/incoming.rs new file mode 100644 index 0000000000..6726224b82 --- /dev/null +++ b/third_party/rust/tokio-tcp/src/incoming.rs @@ -0,0 +1,45 @@ +use super::TcpListener; +use super::TcpStream; + +use std::io; +use futures::stream::Stream; +use futures::{Poll, Async}; + +#[cfg(feature = "unstable-futures")] +use futures2; + +/// Stream returned by the `TcpListener::incoming` function representing the +/// stream of sockets received from a listener. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Incoming { + inner: TcpListener, +} + +impl Incoming { + pub(crate) fn new(listener: TcpListener) -> Incoming { + Incoming { inner: listener } + } +} + +impl Stream for Incoming { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { + let (socket, _) = try_ready!(self.inner.poll_accept()); + Ok(Async::Ready(Some(socket))) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Stream for Incoming { + type Item = TcpStream; + type Error = io::Error; + + fn poll_next(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<Option<Self::Item>, io::Error> + { + Ok(self.inner.poll_accept2(cx)?.map(|(sock, _)| Some(sock))) + } +} diff --git a/third_party/rust/tokio-tcp/src/lib.rs b/third_party/rust/tokio-tcp/src/lib.rs new file mode 100644 index 0000000000..c7713ee213 --- /dev/null +++ b/third_party/rust/tokio-tcp/src/lib.rs @@ -0,0 +1,59 @@ +//! TCP bindings for `tokio`. +//! +//! This module contains the TCP networking types, similar to the standard +//! library, which can be used to implement networking protocols. +//! +//! Connecting to an address, via TCP, can be done using [`TcpStream`]'s +//! [`connect`] method, which returns [`ConnectFuture`]. `ConnectFuture` +//! implements a future which returns a `TcpStream`. +//! +//! To listen on an address [`TcpListener`] can be used. `TcpListener`'s +//! [`incoming`][incoming_method] method can be used to accept new connections. +//! It return the [`Incoming`] struct, which implements a stream which returns +//! `TcpStream`s. +//! +//! [`TcpStream`]: struct.TcpStream.html +//! [`connect`]: struct.TcpStream.html#method.connect +//! [`ConnectFuture`]: struct.ConnectFuture.html +//! [`TcpListener`]: struct.TcpListener.html +//! [incoming_method]: struct.TcpListener.html#method.incoming +//! [`Incoming`]: struct.Incoming.html + +#![doc(html_root_url = "https://docs.rs/tokio-tcp/0.1.1")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate iovec; +extern crate mio; +extern crate tokio_io; +extern crate tokio_reactor; + +#[cfg(feature = "unstable-futures")] +extern crate futures2; + +mod incoming; +mod listener; +mod stream; + +pub use self::incoming::Incoming; +pub use self::listener::TcpListener; +pub use self::stream::TcpStream; +pub use self::stream::ConnectFuture; + +#[cfg(feature = "unstable-futures")] +fn lift_async<T>(old: futures::Async<T>) -> futures2::Async<T> { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +#[cfg(feature = "unstable-futures")] +fn lower_async<T>(new: futures2::Async<T>) -> futures::Async<T> { + match new { + futures2::Async::Ready(x) => futures::Async::Ready(x), + futures2::Async::Pending => futures::Async::NotReady, + } +} diff --git a/third_party/rust/tokio-tcp/src/listener.rs b/third_party/rust/tokio-tcp/src/listener.rs new file mode 100644 index 0000000000..1eff355676 --- /dev/null +++ b/third_party/rust/tokio-tcp/src/listener.rs @@ -0,0 +1,261 @@ +use super::Incoming; +use super::TcpStream; + +use std::fmt; +use std::io; +use std::net::{self, SocketAddr}; + +use futures::{Poll, Async}; +use mio; +use tokio_reactor::{Handle, PollEvented}; + +#[cfg(feature = "unstable-futures")] +use futures2; + +/// An I/O object representing a TCP socket listening for incoming connections. +/// +/// This object can be converted into a stream of incoming connections for +/// various forms of processing. +pub struct TcpListener { + io: PollEvented<mio::net::TcpListener>, +} + +impl TcpListener { + /// Create a new TCP listener associated with this event loop. + /// + /// The TCP listener will bind to the provided `addr` address, if available. + /// If the result is `Ok`, the socket has successfully bound. + pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> { + let l = mio::net::TcpListener::bind(addr)?; + Ok(TcpListener::new(l)) + } + + #[deprecated(since = "0.1.2", note = "use poll_accept instead")] + #[doc(hidden)] + pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { + match self.poll_accept()? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Attempt to accept a connection and create a new connected `TcpStream` if + /// successful. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `TcpStream`s with the `incoming` method + /// below. + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready((socket, addr)))`. + /// + /// If the listener is not ready to accept, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the listener becomes ready to accept. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_accept(&mut self) -> Poll<(TcpStream, SocketAddr), io::Error> { + let (io, addr) = try_ready!(self.poll_accept_std()); + + let io = mio::net::TcpStream::from_stream(io)?; + let io = TcpStream::new(io); + + Ok((io, addr).into()) + } + + /// Like `poll_accept`, but for futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_accept2(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<(TcpStream, SocketAddr), io::Error> + { + let (io, addr) = match self.poll_accept_std2(cx)? { + futures2::Async::Ready(x) => x, + futures2::Async::Pending => return Ok(futures2::Async::Pending), + }; + + let io = mio::net::TcpStream::from_stream(io)?; + let io = TcpStream::new(io); + + Ok((io, addr).into()) + } + + #[deprecated(since = "0.1.2", note = "use poll_accept_std instead")] + #[doc(hidden)] + pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { + match self.poll_accept_std()? { + Async::Ready(ret) => Ok(ret), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Attempt to accept a connection and create a new connected `TcpStream` if + /// successful. + /// + /// This function is the same as `accept` above except that it returns a + /// `std::net::TcpStream` instead of a `tokio::net::TcpStream`. This in turn + /// can then allow for the TCP stream to be associated with a different + /// reactor than the one this `TcpListener` is associated with. + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready((socket, addr)))`. + /// + /// If the listener is not ready to accept, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the listener becomes ready to accept. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_accept_std(&mut self) -> Poll<(net::TcpStream, SocketAddr), io::Error> { + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + + match self.io.get_ref().accept_std() { + Ok(pair) => Ok(pair.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(mio::Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Like `poll_accept_std`, but for futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn poll_accept_std2(&mut self, cx: &mut futures2::task::Context) + -> futures2::Poll<(net::TcpStream, SocketAddr), io::Error> + { + if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.io.get_ref().accept_std() { + Ok(pair) => Ok(pair.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + /// Create a new TCP listener from the standard library's TCP listener. + /// + /// This method can be used when the `Handle::tcp_listen` method isn't + /// sufficient because perhaps some more configuration is needed in terms of + /// before the calls to `bind` and `listen`. + /// + /// This API is typically paired with the `net2` crate and the `TcpBuilder` + /// type to build up and customize a listener before it's shipped off to the + /// backing event loop. This allows configuration of options like + /// `SO_REUSEPORT`, binding to multiple addresses, etc. + /// + /// The `addr` argument here is one of the addresses that `listener` is + /// bound to and the listener will only be guaranteed to accept connections + /// of the same address type currently. + /// + /// Finally, the `handle` argument is the event loop that this listener will + /// be bound to. + /// Use `Handle::default()` to lazily bind to an event loop, just like `bind` does. + /// + /// The platform specific behavior of this function looks like: + /// + /// * On Unix, the socket is placed into nonblocking mode and connections + /// can be accepted as normal + /// + /// * On Windows, the address is stored internally and all future accepts + /// will only be for the same IP version as `addr` specified. That is, if + /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as + /// well (same for IPv6). + pub fn from_std(listener: net::TcpListener, handle: &Handle) + -> io::Result<TcpListener> + { + let io = mio::net::TcpListener::from_std(listener)?; + let io = PollEvented::new_with_handle(io, handle)?; + Ok(TcpListener { io }) + } + + fn new(listener: mio::net::TcpListener) -> TcpListener { + let io = PollEvented::new(listener); + TcpListener { io } + } + + /// Returns the local address that this listener is bound to. + /// + /// This can be useful, for example, when binding to port 0 to figure out + /// which port was actually bound. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Consumes this listener, returning a stream of the sockets this listener + /// accepts. + /// + /// This method returns an implementation of the `Stream` trait which + /// resolves to the sockets the are accepted on this listener. + /// + /// # Errors + /// + /// Note that accepting a connection can lead to various errors and not all of them are + /// necessarily fatal ‒ for example having too many open file descriptors or the other side + /// closing the connection while it waits in an accept queue. These would terminate the stream + /// if not handled in any way. + /// + /// If aiming for production, decision what to do about them must be made. The + /// [`tk-listen`](https://crates.io/crates/tk-listen) crate might be of some help. + pub fn incoming(self) -> Incoming { + Incoming::new(self) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: #method.set_ttl + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } +} + +impl fmt::Debug for TcpListener { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +#[cfg(unix)] +mod sys { + use std::os::unix::prelude::*; + use super::TcpListener; + + impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::{TcpListener; + // + // impl AsRawHandle for TcpListener { + // fn as_raw_handle(&self) -> RawHandle { + // self.listener.io().as_raw_handle() + // } + // } +} diff --git a/third_party/rust/tokio-tcp/src/stream.rs b/third_party/rust/tokio-tcp/src/stream.rs new file mode 100644 index 0000000000..8f4eeaeb01 --- /dev/null +++ b/third_party/rust/tokio-tcp/src/stream.rs @@ -0,0 +1,755 @@ +use std::fmt; +use std::io::{self, Read, Write}; +use std::mem; +use std::net::{self, SocketAddr, Shutdown}; +use std::time::Duration; + +use bytes::{Buf, BufMut}; +use futures::{Future, Poll, Async}; +use iovec::IoVec; +use mio; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; + +#[cfg(feature = "unstable-futures")] +use futures2; + +/// An I/O object representing a TCP stream connected to a remote endpoint. +/// +/// A TCP stream can either be created by connecting to an endpoint, via the +/// [`connect`] method, or by [accepting] a connection from a [listener]. +/// +/// [`connect`]: struct.TcpStream.html#method.connect +/// [accepting]: struct.TcpListener.html#method.accept +/// [listener]: struct.TcpListener.html +pub struct TcpStream { + io: PollEvented<mio::net::TcpStream>, +} + +/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` +/// when the stream is connected. +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct ConnectFuture { + inner: ConnectFutureState, +} + +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +enum ConnectFutureState { + Waiting(TcpStream), + Error(io::Error), + Empty, +} + +impl TcpStream { + /// Create a new TCP stream connected to the specified address. + /// + /// This function will create a new TCP socket and attempt to connect it to + /// the `addr` provided. The returned future will be resolved once the + /// stream has successfully connected, or it wil return an error if one + /// occurs. + pub fn connect(addr: &SocketAddr) -> ConnectFuture { + use self::ConnectFutureState::*; + + let inner = match mio::net::TcpStream::connect(addr) { + Ok(tcp) => Waiting(TcpStream::new(tcp)), + Err(e) => Error(e), + }; + + ConnectFuture { inner } + } + + pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream { + let io = PollEvented::new(connected); + TcpStream { io } + } + + /// Create a new `TcpStream` from a `net::TcpStream`. + /// + /// This function will convert a TCP stream created by the standard library + /// to a TCP stream ready to be used with the provided event loop handle. + /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does. + pub fn from_std(stream: net::TcpStream, handle: &Handle) + -> io::Result<TcpStream> + { + let io = mio::net::TcpStream::from_stream(stream)?; + let io = PollEvented::new_with_handle(io, handle)?; + + Ok(TcpStream { io }) + } + + /// Creates a new `TcpStream` from the pending socket inside the given + /// `std::net::TcpStream`, connecting it to the address specified. + /// + /// This constructor allows configuring the socket before it's actually + /// connected, and this function will transfer ownership to the returned + /// `TcpStream` if successful. An unconnected `TcpStream` can be created + /// with the `net2::TcpBuilder` type (and also configured via that route). + /// + /// The platform specific behavior of this function looks like: + /// + /// * On Unix, the socket is placed into nonblocking mode and then a + /// `connect` call is issued. + /// + /// * On Windows, the address is stored internally and the connect operation + /// is issued when the returned `TcpStream` is registered with an event + /// loop. Note that on Windows you must `bind` a socket before it can be + /// connected, so if a custom `TcpBuilder` is used it should be bound + /// (perhaps to `INADDR_ANY`) before this method is called. + pub fn connect_std(stream: net::TcpStream, + addr: &SocketAddr, + handle: &Handle) + -> ConnectFuture + { + use self::ConnectFutureState::*; + + let io = mio::net::TcpStream::connect_stream(stream, addr) + .and_then(|io| PollEvented::new_with_handle(io, handle)); + + let inner = match io { + Ok(io) => Waiting(TcpStream { io }), + Err(e) => Error(e), + }; + + ConnectFuture { inner: inner } + } + + /// Check the TCP stream's read readiness state. + /// + /// The mask argument allows specifying what readiness to notify on. This + /// can be any value, including platform specific readiness, **except** + /// `writable`. HUP is always implicitly included on platforms that support + /// it. + /// + /// If the resource is not ready for a read then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The stream will remain in a read-ready state until calls to `poll_read` + /// return `NotReady`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` includes writable. + /// * called from outside of a task context. + pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> { + self.io.poll_read_ready(mask) + } + + /// Like `poll_read_ready`, but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready) + -> futures2::Poll<mio::Ready, io::Error> + { + self.io.poll_read_ready2(cx, mask) + } + + /// Check the TCP stream's write readiness state. + /// + /// This always checks for writable readiness and also checks for HUP + /// readiness on platforms that support it. + /// + /// If the resource is not ready for a write then `Async::NotReady` is + /// returned and the current task is notified once a new event is received. + /// + /// The I/O resource will remain in a write-ready state until calls to + /// `poll_write` return `NotReady`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` contains bits besides `writable` and `hup`. + /// * called from outside of a task context. + pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { + self.io.poll_write_ready() + } + + /// Like `poll_write_ready`, but compatible with futures 0.2. + #[cfg(feature = "unstable-futures")] + pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context) + -> futures2::Poll<mio::Ready, io::Error> + { + self.io.poll_write_ready2(cx) + } + + /// Returns the local address that this stream is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the remote address that this stream is connected to. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + #[deprecated(since = "0.1.2", note = "use poll_peek instead")] + #[doc(hidden)] + pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.poll_peek(buf)? { + Async::Ready(n) => Ok(n), + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()), + } + } + + /// Receives data on the socket from the remote address to which it is + /// connected, without removing that data from the queue. On success, + /// returns the number of bytes peeked. + /// + /// Successive calls return the same data. This is accomplished by passing + /// `MSG_PEEK` as a flag to the underlying recv system call. + /// + /// # Return + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task to receive a + /// notification when the socket becomes readable or is closed. + /// + /// # Panics + /// + /// This function will panic if called from outside of a task context. + pub fn poll_peek(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_read_ready(mio::Ready::readable())); + + match self.io.get_ref().peek(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(mio::Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Like `poll_peek` but compatible with futures 0.2 + #[cfg(feature = "unstable-futures")] + pub fn poll_peek2(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending); + } + + match self.io.get_ref().peek(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O on the specified + /// portions to return immediately with an appropriate value (see the + /// documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } + + /// Gets the value of the `TCP_NODELAY` option on this socket. + /// + /// For more information about this option, see [`set_nodelay`]. + /// + /// [`set_nodelay`]: #method.set_nodelay + pub fn nodelay(&self) -> io::Result<bool> { + self.io.get_ref().nodelay() + } + + /// Sets the value of the `TCP_NODELAY` option on this socket. + /// + /// If set, this option disables the Nagle algorithm. This means that + /// segments are always sent as soon as possible, even if there is only a + /// small amount of data. When not set, data is buffered until there is a + /// sufficient amount to send out, thereby avoiding the frequent sending of + /// small packets. + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.io.get_ref().set_nodelay(nodelay) + } + + /// Gets the value of the `SO_RCVBUF` option on this socket. + /// + /// For more information about this option, see [`set_recv_buffer_size`]. + /// + /// [`set_recv_buffer_size`]: #tymethod.set_recv_buffer_size + pub fn recv_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().recv_buffer_size() + } + + /// Sets the value of the `SO_RCVBUF` option on this socket. + /// + /// Changes the size of the operating system's receive buffer associated + /// with the socket. + pub fn set_recv_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_recv_buffer_size(size) + } + + /// Gets the value of the `SO_SNDBUF` option on this socket. + /// + /// For more information about this option, see [`set_send_buffer`]. + /// + /// [`set_send_buffer`]: #tymethod.set_send_buffer + pub fn send_buffer_size(&self) -> io::Result<usize> { + self.io.get_ref().send_buffer_size() + } + + /// Sets the value of the `SO_SNDBUF` option on this socket. + /// + /// Changes the size of the operating system's send buffer associated with + /// the socket. + pub fn set_send_buffer_size(&self, size: usize) -> io::Result<()> { + self.io.get_ref().set_send_buffer_size(size) + } + + /// Returns whether keepalive messages are enabled on this socket, and if so + /// the duration of time between them. + /// + /// For more information about this option, see [`set_keepalive`]. + /// + /// [`set_keepalive`]: #tymethod.set_keepalive + pub fn keepalive(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().keepalive() + } + + /// Sets whether keepalive messages are enabled to be sent on this socket. + /// + /// On Unix, this option will set the `SO_KEEPALIVE` as well as the + /// `TCP_KEEPALIVE` or `TCP_KEEPIDLE` option (depending on your platform). + /// On Windows, this will set the `SIO_KEEPALIVE_VALS` option. + /// + /// If `None` is specified then keepalive messages are disabled, otherwise + /// the duration specified will be the time to remain idle before sending a + /// TCP keepalive probe. + /// + /// Some platforms specify this value in seconds, so sub-second + /// specifications may be omitted. + pub fn set_keepalive(&self, keepalive: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_keepalive(keepalive) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// For more information about this option, see [`set_ttl`]. + /// + /// [`set_ttl`]: #tymethod.set_ttl + pub fn ttl(&self) -> io::Result<u32> { + self.io.get_ref().ttl() + } + + /// Sets the value for the `IP_TTL` option on this socket. + /// + /// This value sets the time-to-live field that is used in every packet sent + /// from this socket. + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.io.get_ref().set_ttl(ttl) + } + + /// Reads the linger duration for this socket by getting the `SO_LINGER` + /// option. + /// + /// For more information about this option, see [`set_linger`]. + /// + /// [`set_linger`]: #tymethod.set_linger + pub fn linger(&self) -> io::Result<Option<Duration>> { + self.io.get_ref().linger() + } + + /// Sets the linger duration of this socket by setting the `SO_LINGER` + /// option. + /// + /// This option controls the action taken when a stream has unsent messages + /// and the stream is closed. If `SO_LINGER` is set, the system + /// shall block the process until it can transmit the data or until the + /// time expires. + /// + /// If `SO_LINGER` is not specified, and the stream is closed, the system + /// handles the call in a way that allows the process to continue as quickly + /// as possible. + pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { + self.io.get_ref().set_linger(dur) + } + + /// Creates a new independently owned handle to the underlying socket. + /// + /// The returned `TcpStream` is a reference to the same stream that this + /// object references. Both handles will read and write the same stream of + /// data, and options set on one stream will be propagated to the other + /// stream. + pub fn try_clone(&self) -> io::Result<TcpStream> { + let io = self.io.get_ref().try_clone()?; + Ok(TcpStream::new(io)) + } +} + +// ===== impl Read / Write ===== + +impl Read for TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.read(buf) + } +} + +impl Write for TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncRead for TcpStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&TcpStream>::read_buf(&mut &*self, buf) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncRead for TcpStream { + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncRead::poll_read(&mut self.io, cx, buf) + } + + fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncRead::poll_vectored_read(&mut &*self, cx, vec) + } + + unsafe fn initializer(&self) -> futures2::io::Initializer { + futures2::io::Initializer::nop() + } +} + +impl AsyncWrite for TcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&TcpStream>::shutdown(&mut &*self) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&TcpStream>::write_buf(&mut &*self, buf) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncWrite for TcpStream { + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncWrite::poll_write(&mut self.io, cx, buf) + } + + fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncWrite::poll_vectored_write(&mut &*self, cx, vec) + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(&mut self.io, cx) + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_close(&mut self.io, cx) + } +} + +// ===== impl Read / Write for &'a ===== + +impl<'a> Read for &'a TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a TcpStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = self.io.poll_read_ready(mio::Ready::readable())? { + return Ok(Async::NotReady) + } + + let r = unsafe { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), b2.into(), b3.into(), b4.into(), + b5.into(), b6.into(), b7.into(), b8.into(), + b9.into(), b10.into(), b11.into(), b12.into(), + b13.into(), b14.into(), b15.into(), b16.into(), + ]; + let n = buf.bytes_vec_mut(&mut bufs); + self.io.get_ref().read_bufs(&mut bufs[..n]) + }; + + match r { + Ok(n) => { + unsafe { buf.advance_mut(n); } + Ok(Async::Ready(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(mio::Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } +} + +#[cfg(feature = "unstable-futures")] +impl<'a> futures2::io::AsyncRead for &'a TcpStream { + fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncRead::poll_read(&mut &self.io, cx, buf) + } + + fn poll_vectored_read(&mut self, cx: &mut futures2::task::Context, vec: &mut [&mut IoVec]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.io.poll_read_ready2(cx, mio::Ready::readable())? { + return Ok(futures2::Async::Pending) + } + + let r = self.io.get_ref().read_bufs(vec); + + match r { + Ok(n) => { + Ok(futures2::Async::Ready(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready2(cx, mio::Ready::readable())?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + unsafe fn initializer(&self) -> futures2::io::Initializer { + futures2::io::Initializer::nop() + } +} + +impl<'a> AsyncWrite for &'a TcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = self.io.poll_write_ready()? { + return Ok(Async::NotReady) + } + + let r = { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [iovec; 64]; + let n = buf.bytes_vec(&mut bufs); + self.io.get_ref().write_bufs(&bufs[..n]) + }; + match r { + Ok(n) => { + buf.advance(n); + Ok(Async::Ready(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } +} + +#[cfg(feature = "unstable-futures")] +impl<'a> futures2::io::AsyncWrite for &'a TcpStream { + fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8]) + -> futures2::Poll<usize, io::Error> + { + futures2::io::AsyncWrite::poll_write(&mut &self.io, cx, buf) + } + + fn poll_vectored_write(&mut self, cx: &mut futures2::task::Context, vec: &[&IoVec]) + -> futures2::Poll<usize, io::Error> + { + if let futures2::Async::Pending = self.io.poll_write_ready2(cx)? { + return Ok(futures2::Async::Pending) + } + + let r = self.io.get_ref().write_bufs(vec); + + match r { + Ok(n) => { + Ok(futures2::Async::Ready(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready2(cx)?; + Ok(futures2::Async::Pending) + } + Err(e) => Err(e), + } + } + + fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_flush(&mut &self.io, cx) + } + + fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> { + futures2::io::AsyncWrite::poll_close(&mut &self.io, cx) + } +} + +impl fmt::Debug for TcpStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl Future for ConnectFuture { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpStream, io::Error> { + self.inner.poll() + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Future for ConnectFuture { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { + futures2::Future::poll(&mut self.inner, cx) + } +} + +impl ConnectFutureState { + fn poll_inner<F>(&mut self, f: F) -> Poll<TcpStream, io::Error> + where F: FnOnce(&mut PollEvented<mio::net::TcpStream>) -> Poll<mio::Ready, io::Error> + { + { + let stream = match *self { + ConnectFutureState::Waiting(ref mut s) => s, + ConnectFutureState::Error(_) => { + let e = match mem::replace(self, ConnectFutureState::Empty) { + ConnectFutureState::Error(e) => e, + _ => panic!(), + }; + return Err(e) + } + ConnectFutureState::Empty => panic!("can't poll TCP stream twice"), + }; + + // Once we've connected, wait for the stream to be writable as + // that's when the actual connection has been initiated. Once we're + // writable we check for `take_socket_error` to see if the connect + // actually hit an error or not. + // + // If all that succeeded then we ship everything on up. + if let Async::NotReady = f(&mut stream.io)? { + return Ok(Async::NotReady) + } + + if let Some(e) = try!(stream.io.get_ref().take_error()) { + return Err(e) + } + } + + match mem::replace(self, ConnectFutureState::Empty) { + ConnectFutureState::Waiting(stream) => Ok(Async::Ready(stream)), + _ => panic!(), + } + } +} + +impl Future for ConnectFutureState { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<TcpStream, io::Error> { + self.poll_inner(|io| io.poll_write_ready()) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Future for ConnectFutureState { + type Item = TcpStream; + type Error = io::Error; + + fn poll(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<TcpStream, io::Error> { + self.poll_inner(|io| io.poll_write_ready2(cx).map(::lower_async)) + .map(::lift_async) + } +} + +#[cfg(unix)] +mod sys { + use std::os::unix::prelude::*; + use super::TcpStream; + + impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + // TODO: let's land these upstream with mio and then we can add them here. + // + // use std::os::windows::prelude::*; + // use super::TcpStream; + // + // impl AsRawHandle for TcpStream { + // fn as_raw_handle(&self) -> RawHandle { + // self.io.get_ref().as_raw_handle() + // } + // } +} diff --git a/third_party/rust/tokio-tcp/tests/chain.rs b/third_party/rust/tokio-tcp/tests/chain.rs new file mode 100644 index 0000000000..c4e37f1030 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/chain.rs @@ -0,0 +1,49 @@ +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Write, Read}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::read_to_end; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn chain_clients() { + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = TcpStream::connect(&addr).unwrap(); + s1.write_all(b"foo ").unwrap(); + let mut s2 = TcpStream::connect(&addr).unwrap(); + s2.write_all(b"bar ").unwrap(); + let mut s3 = TcpStream::connect(&addr).unwrap(); + s3.write_all(b"baz").unwrap(); + }); + + let clients = srv.incoming().take(3); + let copied = clients.collect().and_then(|clients| { + let mut clients = clients.into_iter(); + let a = clients.next().unwrap(); + let b = clients.next().unwrap(); + let c = clients.next().unwrap(); + + read_to_end(a.chain(b).chain(c), Vec::new()) + }); + + let (_, data) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(data, b"foo bar baz"); +} diff --git a/third_party/rust/tokio-tcp/tests/echo.rs b/third_party/rust/tokio-tcp/tests/echo.rs new file mode 100644 index 0000000000..3c020b193e --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/echo.rs @@ -0,0 +1,51 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::Future; +use futures::stream::Stream; +use tokio_tcp::TcpListener; +use tokio_io::AsyncRead; +use tokio_io::io::copy; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + drop(env_logger::init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let msg = "foo bar baz"; + let t = thread::spawn(move || { + let mut s = TcpStream::connect(&addr).unwrap(); + + for _i in 0..1024 { + assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); + let mut buf = [0; 1024]; + assert_eq!(t!(s.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg.as_bytes()); + } + }); + + let clients = srv.incoming(); + let client = clients.into_future().map(|e| e.0.unwrap()).map_err(|e| e.0); + let halves = client.map(|s| s.split()); + let copied = halves.and_then(|(a, b)| copy(a, b)); + + let (amt, _, _) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(amt, msg.len() as u64 * 1024); +} diff --git a/third_party/rust/tokio-tcp/tests/limit.rs b/third_party/rust/tokio-tcp/tests/limit.rs new file mode 100644 index 0000000000..8714da9a51 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/limit.rs @@ -0,0 +1,43 @@ +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::net::TcpStream; +use std::thread; +use std::io::{Write, Read}; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::read_to_end; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn limit() { + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = TcpStream::connect(&addr).unwrap(); + s1.write_all(b"foo bar baz").unwrap(); + }); + + let clients = srv.incoming().take(1); + let copied = clients.collect().and_then(|clients| { + let mut clients = clients.into_iter(); + let a = clients.next().unwrap(); + + read_to_end(a.take(4), Vec::new()) + }); + + let (_, data) = t!(copied.wait()); + t.join().unwrap(); + + assert_eq!(data, b"foo "); +} diff --git a/third_party/rust/tokio-tcp/tests/stream-buffered.rs b/third_party/rust/tokio-tcp/tests/stream-buffered.rs new file mode 100644 index 0000000000..a6d71298dc --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/stream-buffered.rs @@ -0,0 +1,54 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio_tcp; +extern crate tokio_io; + +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::thread; + +use futures::Future; +use futures::stream::Stream; +use tokio_io::io::copy; +use tokio_io::AsyncRead; +use tokio_tcp::TcpListener; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo_server() { + drop(env_logger::init()); + + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + let mut s1 = t!(TcpStream::connect(&addr)); + let mut s2 = t!(TcpStream::connect(&addr)); + + let msg = b"foo"; + assert_eq!(t!(s1.write(msg)), msg.len()); + assert_eq!(t!(s2.write(msg)), msg.len()); + let mut buf = [0; 1024]; + assert_eq!(t!(s1.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg); + assert_eq!(t!(s2.read(&mut buf)), msg.len()); + assert_eq!(&buf[..msg.len()], msg); + }); + + let future = srv.incoming() + .map(|s| s.split()) + .map(|(a, b)| copy(a, b).map(|_| ())) + .buffered(10) + .take(2) + .collect(); + + t!(future.wait()); + + t.join().unwrap(); +} diff --git a/third_party/rust/tokio-tcp/tests/tcp.rs b/third_party/rust/tokio-tcp/tests/tcp.rs new file mode 100644 index 0000000000..c905711b20 --- /dev/null +++ b/third_party/rust/tokio-tcp/tests/tcp.rs @@ -0,0 +1,132 @@ +extern crate env_logger; +extern crate tokio_io; +extern crate tokio_tcp; +extern crate mio; +extern crate futures; + +use std::{net, thread}; +use std::sync::mpsc::channel; + +use futures::{Future, Stream}; +use tokio_tcp::{TcpListener, TcpStream}; + + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn connect() { + drop(env_logger::init()); + let srv = t!(net::TcpListener::bind("127.0.0.1:0")); + let addr = t!(srv.local_addr()); + let t = thread::spawn(move || { + t!(srv.accept()).0 + }); + + let stream = TcpStream::connect(&addr); + let mine = t!(stream.wait()); + let theirs = t.join().unwrap(); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); +} + +#[test] +fn accept() { + drop(env_logger::init()); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let (tx, rx) = channel(); + let client = srv.incoming().map(move |t| { + tx.send(()).unwrap(); + t + }).into_future().map_err(|e| e.0); + assert!(rx.try_recv().is_err()); + let t = thread::spawn(move || { + net::TcpStream::connect(&addr).unwrap() + }); + + let (mine, _remaining) = t!(client.wait()); + let mine = mine.unwrap(); + let theirs = t.join().unwrap(); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); +} + +#[test] +fn accept2() { + drop(env_logger::init()); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); + let addr = t!(srv.local_addr()); + + let t = thread::spawn(move || { + net::TcpStream::connect(&addr).unwrap() + }); + + let (tx, rx) = channel(); + let client = srv.incoming().map(move |t| { + tx.send(()).unwrap(); + t + }).into_future().map_err(|e| e.0); + assert!(rx.try_recv().is_err()); + + let (mine, _remaining) = t!(client.wait()); + mine.unwrap(); + t.join().unwrap(); +} + +#[cfg(unix)] +mod unix { + use tokio_tcp::TcpStream; + + use env_logger; + use futures::{Future, future}; + use mio::unix::UnixReady; + use tokio_io::AsyncRead; + + use std::io::Write; + use std::{net, thread}; + use std::time::Duration; + + #[test] + fn poll_hup() { + drop(env_logger::init()); + + let srv = t!(net::TcpListener::bind("127.0.0.1:0")); + let addr = t!(srv.local_addr()); + let t = thread::spawn(move || { + let mut client = t!(srv.accept()).0; + client.write(b"hello world").unwrap(); + thread::sleep(Duration::from_millis(200)); + }); + + let mut stream = t!(TcpStream::connect(&addr).wait()); + + // Poll for HUP before reading. + future::poll_fn(|| { + stream.poll_read_ready(UnixReady::hup().into()) + }).wait().unwrap(); + + // Same for write half + future::poll_fn(|| { + stream.poll_write_ready() + }).wait().unwrap(); + + let mut buf = vec![0; 11]; + + // Read the data + future::poll_fn(|| { + stream.poll_read(&mut buf) + }).wait().unwrap(); + + assert_eq!(b"hello world", &buf[..]); + + t.join().unwrap(); + } +} |