diff options
Diffstat (limited to '')
38 files changed, 5550 insertions, 0 deletions
diff --git a/third_party/rust/tokio-io/.cargo-checksum.json b/third_party/rust/tokio-io/.cargo-checksum.json new file mode 100644 index 0000000000..f4c4bd86e1 --- /dev/null +++ b/third_party/rust/tokio-io/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"387a66496fd62e0871d79d9669bc061df52cd57933194b0d99084045bace9734","Cargo.toml":"7254ab36766d9950efe7d48d3caf046575852ec06f587a0ff577ec5676ffb447","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"14a126b0e0c7dd5a06be664d3f8f66b7215fe9be18ecac9169901f2243697e56","src/_tokio_codec/decoder.rs":"252957cf5e006f20641e596450c18de25c68faf518f2ae0351e49dcac3162629","src/_tokio_codec/encoder.rs":"45373272161b3fe1d9a7f432f582d86bc529bf3b985675be0624e5491be6dec4","src/_tokio_codec/framed.rs":"80ef1f2d1fbcbb7aaeae776b3509d7b398ff34cf3eb2946be8993e48bf8d1fc3","src/_tokio_codec/framed_read.rs":"41f0dec98372c2d0fa2dca46c9d6285612e6931c104ec0f8f647dbec54e2ba84","src/_tokio_codec/framed_write.rs":"22ea8d4e1717e930e928659b28ea158aa2fc3aee05d5065e0f35d5195edb5735","src/_tokio_codec/mod.rs":"6798cf0f1f15e214ee182907769e93c6f902d60e976a034a5b528b849d183dec","src/allow_std.rs":"15408c75c99bfa54b87749afc9b94a1c19ca47ebe6b612718a2f31a6bc0c3bcd","src/async_read.rs":"de31d6e1aa04e90de0223320cb5a7753b9a694847dd4f1280d2379bf08496d5f","src/async_write.rs":"036a0773ea694a0d7ff6f57b084631d4f8977dad8b4d955a5c081ec8d3910c29","src/codec/bytes_codec.rs":"e36062da1fcde8f0a452963fff5c2848fe965c6650992d4ae0977116d86b03aa","src/codec/decoder.rs":"f18d8902d241827b86111dba0aca7a70b375a563747af2517bf1125f1418a1d7","src/codec/encoder.rs":"4c2d7a0ff2a500b6deb0d4fd6dff7f8b181d708947e4a19b7c0066ad4b8eaa76","src/codec/lines_codec.rs":"e21e478f5e5b517fd8c4082a8728d4034d2b76ed2da299b39b2ec0eb9383f79e","src/codec/mod.rs":"a596895c8d8c2767ead7c767c09b65b22d4f4ad4e9cd0681abf402dff8b31807","src/framed.rs":"c1b95325ecff696187e22084530846116a84a9541872d99a8416a82a9bc7e34c","src/framed_read.rs":"5f02fae02e3964be800392a14f3d18c0e7050f5fe0672887f6e18be0f6ec2134","src/framed_write.rs":"704d436941a6b00006a253baf56810bf9d09ae6f13ff41e72739dec81255ce21","src/io/copy.rs":"5f9697b407dabd40264cf4aa8d101fb3632d54fffe7f85121f8b342cdb69d241","src/io/flush.rs":"65fe30a42a41d3b09b6fccd22808a90c1ae687852843e5a5f8515df0ef58469b","src/io/mod.rs":"03e065df1714acf7b5f4aa3226378e86254652cefd3ba263b13f939cd3a6cf52","src/io/read.rs":"ada420d05b641665ecb2f38d8e8eeb9a426d552ec301da324b4577ab5b876cd2","src/io/read_exact.rs":"93a84c62ffff9bdc890c3589e62704aff13caeba16c27008aef4bd197cfba75d","src/io/read_to_end.rs":"36df527df7d814a2272585768c7e255424cf7ec2f79037466ebbd2b1369f8087","src/io/read_until.rs":"2f3dce5953d066fa448f2b61e255426fbf41dd870d45ced15598f7be225dc59d","src/io/shutdown.rs":"eb685003aa5859e1b11beae573797a1af4a2427fad83b4c5b7000d1c280d8b39","src/io/write_all.rs":"12c6b0c56140bbe2affde286fe3c172ede93513235701edf358925d820da4f16","src/length_delimited.rs":"1041a272f328cb52da53d0ea7e272127d8ab2e4816699678519cb69f494009ca","src/lib.rs":"6ea12b37a1a80c444bc15073fc2dedb004ec7c67ce23e4160e8c6973323e67c3","src/lines.rs":"26f0b01a6b4c3b28fa20c1c19d035a59de36f22f6595bafd19cc80e751f93b1b","src/split.rs":"04e174365e2932b158c6c81c25b8a1a8a33fad55b23cb7e63a233b39f122db83","src/window.rs":"e6fca0e2e8d99c76b4fe3ab9956997c6fd8167005d4dec01cae0c7879203e3c5","tests/async_read.rs":"3090821dcfbbd8285d5901ee70e8e66601dc529da1d12b117a10ca5ce7e366a0","tests/length_delimited.rs":"6160dd3d7f0123c6c89ca4c88acb277aac6240da88ab997078c6f5c6d83d5a06"},"package":"57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"}
\ No newline at end of file diff --git a/third_party/rust/tokio-io/CHANGELOG.md b/third_party/rust/tokio-io/CHANGELOG.md new file mode 100644 index 0000000000..894d1e5fa4 --- /dev/null +++ b/third_party/rust/tokio-io/CHANGELOG.md @@ -0,0 +1,67 @@ +# 0.1.13 (February 4, 2020) + +* Add `tokio 0.2.x` deprecation notice. + +# 0.1.12 (March 1, 2019) + +### Added +- Add `unsplit` to join previously split `AsyncRead + AsyncWrite` (#807). + +# 0.1.11 (January 6, 2019) + +* Fix minor error in Decoder::decode API documentation (#797). + +# 0.1.10 (October 23, 2018) + +* Expose inner codec from `Framed` (#686). +* Implement AsyncRead::prepare_uninitialized_buffer for Take and Chain (#678). + +# 0.1.9 (September 27, 2018) + +* Fix bug in `AsyncRead::split()` (#655). +* Fix non-terminating loop in `length_delimited::FramedWrite` (#576). + +# 0.1.8 (August 23, 2018) + +* Documentation improvements + +# 0.1.7 (June 13, 2018) + +* Move `codec::{Encode, Decode, Framed*}` into `tokio-codec` (#353) + +# 0.1.6 (March 09, 2018) + +* Add native endian builder fn to length_delimited (#144) +* Add AsyncRead::poll_read, AsyncWrite::poll_write (#170) + +# 0.1.5 (February 07, 2018) + +* Fix bug in `BytesCodec` and `LinesCodec`. +* Performance improvement to `split`. + +# 0.1.4 (November 10, 2017) + +* Use `FrameTooBig` as length delimited error type (#70). +* Provide `Bytes` and `Lines` codecs (#78). +* Provide `AllowStdIo` wrapper (#76). + +# 0.1.3 (August 14, 2017) + +* Fix bug involving zero sized writes in copy helper (#57). +* Add get / set accessors for length delimited max frame length setting. (#65). +* Add `Framed::into_parts_and_codec` (#59). + +# 0.1.2 (May 23, 2017) + +* Add `from_parts` and `into_parts` to the framing combinators. +* Support passing an initialized buffer to the framing combinators. +* Add `length_adjustment` support to length delimited encoding (#48). + +# 0.1.1 (March 22, 2017) + +* Add some omitted `Self: Sized` bounds. +* Add missing "inner" fns. + +# 0.1.0 (March 15, 2017) + +* Initial release diff --git a/third_party/rust/tokio-io/Cargo.toml b/third_party/rust/tokio-io/Cargo.toml new file mode 100644 index 0000000000..e809144cf5 --- /dev/null +++ b/third_party/rust/tokio-io/Cargo.toml @@ -0,0 +1,32 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio-io" +version = "0.1.13" +authors = ["Carl Lerche <me@carllerche.com>"] +description = "Core I/O primitives for asynchronous I/O in Rust.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-io/0.1.13/tokio_io" +categories = ["asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4.7" + +[dependencies.futures] +version = "0.1.18" + +[dependencies.log] +version = "0.4" +[dev-dependencies.tokio-current-thread] +version = "0.1.1" diff --git a/third_party/rust/tokio-io/LICENSE b/third_party/rust/tokio-io/LICENSE new file mode 100644 index 0000000000..cdb28b4b56 --- /dev/null +++ b/third_party/rust/tokio-io/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2019 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-io/README.md b/third_party/rust/tokio-io/README.md new file mode 100644 index 0000000000..c25cb11f43 --- /dev/null +++ b/third_party/rust/tokio-io/README.md @@ -0,0 +1,42 @@ +# tokio-io + +Core I/O abstractions for the Tokio stack. + +[![Build Status](https://travis-ci.org/tokio-rs/tokio-io.svg?branch=master)](https://travis-ci.org/tokio-rs/tokio-io) + +> **Note:** This crate has been **deprecated in tokio 0.2.x** and has been moved +> into [`tokio::io`]. + +[`tokio::io`]: https://docs.rs/tokio/latest/tokio/io/index.html + +[Documentation](https://docs.rs/tokio-io/0.1.12/tokio_io) + +## Usage + +First, add this to your `Cargo.toml`: + +```toml +[dependencies] +tokio-io = "0.1" +``` + +Next, add this to your crate: + +```rust +extern crate tokio_io; +``` + +You can find extensive documentation and examples about how to use this crate +online at [https://tokio.rs](https://tokio.rs). The [API +documentation](https://docs.rs/tokio-io) is also a great place to get started +for the nitty-gritty. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs new file mode 100644 index 0000000000..edc0ecc0d4 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implementation of Encoder in tokio_io. + +pub use codec::Decoder; diff --git a/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs new file mode 100644 index 0000000000..20b9e375e0 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implementation of Encoder in tokio_io. + +pub use codec::Encoder; diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs new file mode 100644 index 0000000000..d290575e79 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs @@ -0,0 +1,283 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read, Write}; + +use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use codec::{Decoder, Encoder}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Poll, Sink, StartSend, Stream}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +pub struct Fuse<T, U>(pub T, pub U); + +impl<T, U> Framed<T, U> +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn new(inner: T, codec: U) -> Framed<T, U> { + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), + parts.read_buf, + ), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Returns a reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec(&self) -> &U { + &self.inner.get_ref().get_ref().1 + } + + /// Returns a mutable reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec_mut(&mut self) -> &mut U { + &mut self.inner.get_mut().get_mut().1 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T, U> { + let (inner, read_buf) = self.inner.into_parts(); + let (inner, write_buf) = inner.into_parts(); + + FramedParts { + io: inner.0, + codec: inner.1, + read_buf: read_buf, + write_buf: write_buf, + _priv: (), + } + } +} + +impl<T, U> Stream for Framed<T, U> +where + T: AsyncRead, + U: Decoder, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, U> Sink for Framed<T, U> +where + T: AsyncWrite, + U: Encoder, + U::Error: From<io::Error>, +{ + type SinkItem = U::Item; + type SinkError = U::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.get_mut().start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().close() + } +} + +impl<T, U> fmt::Debug for Framed<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl<T: Read, U> Read for Fuse<T, U> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.0.read(dst) + } +} + +impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } +} + +impl<T: Write, U> Write for Fuse<T, U> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} + +impl<T, U: Decoder> Decoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl<T, U: Encoder> Encoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts<T, U> { + /// The inner transport used to read bytes to and write bytes to + pub io: T, + + /// The codec + pub codec: U, + + /// The buffer with read but unprocessed data. + pub read_buf: BytesMut, + + /// A buffer with unprocessed data which are not written yet. + pub write_buf: BytesMut, + + /// This private field allows us to add additional fields in the future in a + /// backwards compatible way. + _priv: (), +} + +impl<T, U> FramedParts<T, U> { + /// Create a new, default, `FramedParts` + pub fn new(io: T, codec: U) -> FramedParts<T, U> { + FramedParts { + io, + codec, + read_buf: BytesMut::new(), + write_buf: BytesMut::new(), + _priv: (), + } + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs new file mode 100644 index 0000000000..ea7550877c --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs @@ -0,0 +1,216 @@ +#![allow(deprecated)] + +use std::fmt; + +use super::framed::Fuse; +use codec::Decoder; +use AsyncRead; + +use bytes::BytesMut; +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A `Stream` of messages decoded from an `AsyncRead`. +pub struct FramedRead<T, D> { + inner: FramedRead2<Fuse<T, D>>, +} + +pub struct FramedRead2<T> { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl<T, D> FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl<T, D> FramedRead<T, D> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl<T, D> Stream for FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + type Item = D::Item; + type Error = D::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, D> Sink for FramedRead<T, D> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.inner.0.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.close() + } +} + +impl<T, D> fmt::Debug for FramedRead<T, D> +where + T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub fn framed_read2<T>(inner: T) -> FramedRead2<T> { + FramedRead2 { + inner: inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner: inner, + eof: false, + is_readable: buf.len() > 0, + buffer: buf, + } +} + +impl<T> FramedRead2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Stream for FramedRead2<T> +where + T: AsyncRead + Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if self.is_readable { + if self.eof { + let frame = self.inner.decode_eof(&mut self.buffer)?; + return Ok(Async::Ready(frame)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = self.inner.decode(&mut self.buffer)? { + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.buffer.reserve(1); + if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { + self.eof = true; + } + + self.is_readable = true; + } + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs new file mode 100644 index 0000000000..7541b1730d --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs @@ -0,0 +1,246 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read}; + +use super::framed::Fuse; +use codec::{Decoder, Encoder}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +/// A `Sink` of frames encoded to an `AsyncWrite`. +pub struct FramedWrite<T, E> { + inner: FramedWrite2<Fuse<T, E>>, +} + +pub struct FramedWrite2<T> { + inner: T, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl<T, E> FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl<T, E> FramedWrite<T, E> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +impl<T, E> Sink for FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + type SinkItem = E::Item; + type SinkError = E::Error; + + fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.inner.close()?) + } +} + +impl<T, D> Stream for FramedWrite<T, D> +where + T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.inner.0.poll() + } +} + +impl<T, U> fmt::Debug for FramedWrite<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedWrite2 ===== + +pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> { + FramedWrite2 { + inner: inner, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedWrite2 { + inner: inner, + buffer: buf, + } +} + +impl<T> FramedWrite2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Sink for FramedWrite2<T> +where + T: AsyncWrite + Encoder, +{ + type SinkItem = T::Item; + type SinkError = T::Error; + + fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> { + // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's + // *still* over 8KiB, then apply backpressure (reject the send). + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + self.poll_complete()?; + + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.inner.encode(item, &mut self.buffer)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + trace!("flushing framed transport"); + + while !self.buffer.is_empty() { + trace!("writing; remaining={}", self.buffer.len()); + + let n = try_ready!(self.inner.poll_write(&self.buffer)); + + if n == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into()); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.buffer.split_to(n); + } + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + trace!("framed transport flushed"); + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_complete()); + Ok(self.inner.shutdown()?) + } +} + +impl<T: Decoder> Decoder for FramedWrite2<T> { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode(src) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode_eof(src) + } +} + +impl<T: Read> Read for FramedWrite2<T> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner.read(dst) + } +} + +impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/mod.rs b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs new file mode 100644 index 0000000000..5d7eb21890 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs @@ -0,0 +1,36 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +#![deny(missing_docs, missing_debug_implementations)] +#![doc(hidden, html_root_url = "https://docs.rs/tokio-codec/0.1.0")] + +// _tokio_codec are the items that belong in the `tokio_codec` crate. However, because we need to +// maintain backward compatibility until the next major breaking change, they are defined here. +// When the next breaking change comes, they should be moved to the `tokio_codec` crate and become +// independent. +// +// The primary reason we can't move these to `tokio-codec` now is because, again for backward +// compatibility reasons, we need to keep `Decoder` and `Encoder` in tokio_io::codec. And `Decoder` +// and `Encoder` needs to reference `Framed`. So they all still need to still be in the same +// module. + +mod decoder; +mod encoder; +mod framed; +mod framed_read; +mod framed_write; + +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; +pub use self::framed::{Framed, FramedParts}; +pub use self::framed_read::FramedRead; +pub use self::framed_write::FramedWrite; diff --git a/third_party/rust/tokio-io/src/allow_std.rs b/third_party/rust/tokio-io/src/allow_std.rs new file mode 100644 index 0000000000..af39ac2197 --- /dev/null +++ b/third_party/rust/tokio-io/src/allow_std.rs @@ -0,0 +1,93 @@ +use futures::{Async, Poll}; +use std::{fmt, io}; +use {AsyncRead, AsyncWrite}; + +/// A simple wrapper type which allows types that only implement +/// `std::io::Read` or `std::io::Write` to be used in contexts which expect +/// an `AsyncRead` or `AsyncWrite`. +/// +/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`, +/// it is expected that they will notify the current task on readiness. +/// Synchronous `std` types should not issue errors of this kind and +/// are safe to use in this context. However, using these types with +/// `AllowStdIo` will cause the event loop to block, so they should be used +/// with care. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct AllowStdIo<T>(T); + +impl<T> AllowStdIo<T> { + /// Creates a new `AllowStdIo` from an existing IO object. + pub fn new(io: T) -> Self { + AllowStdIo(io) + } + + /// Returns a reference to the contained IO object. + pub fn get_ref(&self) -> &T { + &self.0 + } + + /// Returns a mutable reference to the contained IO object. + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes self and returns the contained IO object. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> io::Write for AllowStdIo<T> +where + T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.0.write_all(buf) + } + fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> { + self.0.write_fmt(fmt) + } +} + +impl<T> AsyncWrite for AllowStdIo<T> +where + T: io::Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } +} + +impl<T> io::Read for AllowStdIo<T> +where + T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } + // TODO: implement the `initializer` fn when it stabilizes. + // See rust-lang/rust #42788 + fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { + self.0.read_to_end(buf) + } + fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { + self.0.read_to_string(buf) + } + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.0.read_exact(buf) + } +} + +impl<T> AsyncRead for AllowStdIo<T> +where + T: io::Read, +{ + // TODO: override prepare_uninitialized_buffer once `Read::initializer` is stable. + // See rust-lang/rust #42788 +} diff --git a/third_party/rust/tokio-io/src/async_read.rs b/third_party/rust/tokio-io/src/async_read.rs new file mode 100644 index 0000000000..d82aa71160 --- /dev/null +++ b/third_party/rust/tokio-io/src/async_read.rs @@ -0,0 +1,173 @@ +use bytes::BufMut; +use futures::{Async, Poll}; +use std::io as std_io; + +#[allow(deprecated)] +use codec::{Decoder, Encoder, Framed}; +use split::{ReadHalf, WriteHalf}; +use {framed, split, AsyncWrite}; + +/// Read bytes asynchronously. +/// +/// This trait inherits from `std::io::Read` and indicates that an I/O object is +/// **non-blocking**. All non-blocking I/O objects must return an error when +/// bytes are unavailable instead of blocking the current thread. +/// +/// Specifically, this means that the `poll_read` function will return one of +/// the following: +/// +/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately read +/// and placed into the output buffer, where `n` == 0 implies that EOF has +/// been reached. +/// +/// * `Ok(Async::NotReady)` means that no data was read into the buffer +/// provided. The I/O object is not currently readable but may become readable +/// in the future. Most importantly, **the current future's task is scheduled +/// to get unparked when the object is readable**. This means that like +/// `Future::poll` you'll receive a notification when the I/O object is +/// readable again. +/// +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `read` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +pub trait AsyncRead: std_io::Read { + /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns + /// `true` if the supplied buffer was zeroed out. + /// + /// While it would be highly unusual, implementations of [`io::Read`] are + /// able to read data from the buffer passed as an argument. Because of + /// this, the buffer passed to [`io::Read`] must be initialized memory. In + /// situations where large numbers of buffers are used, constantly having to + /// zero out buffers can be expensive. + /// + /// This function does any necessary work to prepare an uninitialized buffer + /// to be safe to pass to `read`. If `read` guarantees to never attempt to + /// read data out of the supplied buffer, then `prepare_uninitialized_buffer` + /// doesn't need to do any work. + /// + /// If this function returns `true`, then the memory has been zeroed out. + /// This allows implementations of `AsyncRead` which are composed of + /// multiple subimplementations to efficiently implement + /// `prepare_uninitialized_buffer`. + /// + /// This function isn't actually `unsafe` to call but `unsafe` to implement. + /// The implementer must ensure that either the whole `buf` has been zeroed + /// or `read_buf()` overwrites the buffer without reading it and returns + /// correct value. + /// + /// This function is called from [`read_buf`]. + /// + /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html + /// [`read_buf`]: #method.read_buf + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + for i in 0..buf.len() { + buf[i] = 0; + } + + true + } + + /// Attempt to read from the `AsyncRead` into `buf`. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Pull some bytes from this source into the specified `BufMut`, returning + /// how many bytes were read. + /// + /// The `buf` provided will have bytes read into it and the internal cursor + /// will be advanced if any bytes were read. Note that this method typically + /// will not reallocate the buffer provided. + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where + Self: Sized, + { + if !buf.has_remaining_mut() { + return Ok(Async::Ready(0)); + } + + unsafe { + let n = { + let b = buf.bytes_mut(); + + self.prepare_uninitialized_buffer(b); + + try_ready!(self.poll_read(b)) + }; + + buf.advance_mut(n); + Ok(Async::Ready(n)) + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// I/O object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + #[deprecated(since = "0.1.7", note = "Use tokio_codec::Decoder::framed instead")] + #[allow(deprecated)] + fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T> + where + Self: AsyncWrite + Sized, + { + framed::framed(self, codec) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `Read` and `Write` traits, + /// respectively. + /// + /// To restore this read/write object from its `ReadHalf` and `WriteHalf` + /// use `unsplit`. + fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) + where + Self: AsyncWrite + Sized, + { + split::split(self) + } +} + +impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a> AsyncRead for &'a [u8] { + unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { + false + } +} diff --git a/third_party/rust/tokio-io/src/async_write.rs b/third_party/rust/tokio-io/src/async_write.rs new file mode 100644 index 0000000000..0a09480e82 --- /dev/null +++ b/third_party/rust/tokio-io/src/async_write.rs @@ -0,0 +1,222 @@ +use bytes::Buf; +use futures::{Async, Poll}; +use std::io as std_io; + +use AsyncRead; + +/// Writes bytes asynchronously. +/// +/// The trait inherits from `std::io::Write` and indicates that an I/O object is +/// **nonblocking**. All non-blocking I/O objects must return an error when +/// bytes cannot be written instead of blocking the current thread. +/// +/// Specifically, this means that the `poll_write` function will return one of +/// the following: +/// +/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately +/// written. +/// +/// * `Ok(Async::NotReady)` means that no data was written from the buffer +/// provided. The I/O object is not currently writable but may become writable +/// in the future. Most importantly, **the current future's task is scheduled +/// to get unparked when the object is writable**. This means that like +/// `Future::poll` you'll receive a notification when the I/O object is +/// writable again. +/// +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `write` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +/// +/// Note that this trait also represents that the `Write::flush` method works +/// very similarly to the `write` method, notably that `Ok(())` means that the +/// writer has successfully been flushed, a "would block" error means that the +/// current task is ready to receive a notification when flushing can make more +/// progress, and otherwise normal errors can happen as well. +pub trait AsyncWrite: std_io::Write { + /// Attempt to write bytes from `buf` into the object. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the object is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> { + match self.write(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Attempt to flush the object, ensuring that any buffered data reach + /// their destination. + /// + /// On success, returns `Ok(Async::Ready(()))`. + /// + /// If flushing cannot immediately complete, this method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object can make + /// progress towards flushing. + fn poll_flush(&mut self) -> Poll<(), std_io::Error> { + match self.flush() { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Initiates or attempts to shut down this writer, returning success when + /// the I/O connection has completely shut down. + /// + /// This method is intended to be used for asynchronous shutdown of I/O + /// connections. For example this is suitable for implementing shutdown of a + /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. + /// Protocols sometimes need to flush out final pieces of data or otherwise + /// perform a graceful shutdown handshake, reading/writing more data as + /// appropriate. This method is the hook for such protocols to implement the + /// graceful shutdown logic. + /// + /// This `shutdown` method is required by implementers of the + /// `AsyncWrite` trait. Wrappers typically just want to proxy this call + /// through to the wrapped type, and base types will typically implement + /// shutdown logic here or just return `Ok(().into())`. Note that if you're + /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that + /// transitively the entire stream has been shut down. After your wrapper's + /// shutdown logic has been executed you should shut down the underlying + /// stream. + /// + /// Invocation of a `shutdown` implies an invocation of `flush`. Once this + /// method returns `Ready` it implies that a flush successfully happened + /// before the shutdown happened. That is, callers don't need to call + /// `flush` before calling `shutdown`. They can rely that by calling + /// `shutdown` any pending buffered data will be written out. + /// + /// # Return value + /// + /// This function returns a `Poll<(), io::Error>` classified as such: + /// + /// * `Ok(Async::Ready(()))` - indicates that the connection was + /// successfully shut down and is now safe to deallocate/drop/close + /// resources associated with it. This method means that the current task + /// will no longer receive any notifications due to this method and the + /// I/O object itself is likely no longer usable. + /// + /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could + /// not complete just yet. This may mean that more I/O needs to happen to + /// continue this shutdown operation. The current task is scheduled to + /// receive a notification when it's otherwise ready to continue the + /// shutdown operation. When woken up this method should be called again. + /// + /// * `Err(e)` - indicates a fatal error has happened with shutdown, + /// indicating that the shutdown operation did not complete successfully. + /// This typically means that the I/O object is no longer usable. + /// + /// # Errors + /// + /// This function can return normal I/O errors through `Err`, described + /// above. Additionally this method may also render the underlying + /// `Write::write` method no longer usable (e.g. will return errors in the + /// future). It's recommended that once `shutdown` is called the + /// `write` method is no longer called. + /// + /// # Panics + /// + /// This function will panic if not called within the context of a future's + /// task. + fn shutdown(&mut self) -> Poll<(), std_io::Error>; + + /// Write a `Buf` into this value, returning how many bytes were written. + /// + /// Note that this method will advance the `buf` provided automatically by + /// the number of bytes written. + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where + Self: Sized, + { + if !buf.has_remaining() { + return Ok(Async::Ready(0)); + } + + let n = try_ready!(self.poll_write(buf.bytes())); + buf.advance(n); + Ok(Async::Ready(n)) + } +} + +impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} +impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} + +impl AsyncRead for std_io::Repeat { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} + +impl AsyncWrite for std_io::Sink { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl<T: AsyncRead> AsyncRead for std_io::Take<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T, U> AsyncRead for std_io::Chain<T, U> +where + T: AsyncRead, + U: AsyncRead, +{ + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + let (t, u) = self.get_ref(); + // We don't need to execute the second initializer if the first one + // already zeroed the buffer out. + t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf) + } +} + +impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + try_ready!(self.poll_flush()); + self.get_mut().shutdown() + } +} + +impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {} + +impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Vec<u8>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Box<[u8]>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/bytes_codec.rs b/third_party/rust/tokio-io/src/codec/bytes_codec.rs new file mode 100644 index 0000000000..ecfd15ab99 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/bytes_codec.rs @@ -0,0 +1,42 @@ +#![allow(deprecated)] + +use bytes::{BufMut, Bytes, BytesMut}; +use codec::{Decoder, Encoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { + BytesCodec(()) + } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/decoder.rs b/third_party/rust/tokio-io/src/codec/decoder.rs new file mode 100644 index 0000000000..cd5777dea8 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/decoder.rs @@ -0,0 +1,117 @@ +use bytes::BytesMut; +use std::io; + +use super::encoder::Encoder; +use {AsyncRead, AsyncWrite}; + +use _tokio_codec::Framed; + +/// Decoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `Framed` or +/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has +/// already been buffered in `src` and decodes the data into a stream of +/// `Self::Item` frames. +/// +/// Implementations are able to track state on `self`, which enables +/// implementing stateful streaming parsers. In many cases, though, this type +/// will simply be a unit struct (e.g. `struct HttpDecoder`). + +// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and +// there doesn't seem to be a way to un-deprecate the re-export. +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// `From<io::Error>` is required in the interest of making `Error` suitable + /// for returning directly from a `FramedRead`, and to enable the default + /// implementation of `decode_eof` to yield an `io::Error` when the decoder + /// fails to consume all available data. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From<io::Error>; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedRead` whenever bytes are ready to be + /// parsed. The provided buffer of bytes is what's been read so far, and + /// this instance of `Decode` can determine whether an entire frame is in + /// the buffer and is ready to be returned. + /// + /// If an entire frame is available, then this instance will remove those + /// bytes from the buffer provided and return them as a decoded + /// frame. Note that removing bytes from the provided buffer doesn't always + /// necessarily copy the bytes, so this should be an efficient operation in + /// most circumstances. + /// + /// If the bytes look valid, but a frame isn't fully available yet, then + /// `Ok(None)` is returned. This indicates to the `Framed` instance that + /// it needs to read some more bytes before calling this method again. + /// + /// Note that the bytes provided may be empty. If a previous call to + /// `decode` consumed all the bytes in the buffer then `decode` will be + /// called again until it returns `Ok(None)`, indicating that more bytes need to + /// be read. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + /// + /// Note that the `buf` argument may be empty. If a previous call to + /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be + /// called again until it returns `None`, indicating that there are no more + /// frames to yield. This behavior enables returning finalization frames + /// that may not be based on inbound data. + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into()) + } + } + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self> + where + Self: Encoder + Sized, + { + Framed::new(io, self) + } +} diff --git a/third_party/rust/tokio-io/src/codec/encoder.rs b/third_party/rust/tokio-io/src/codec/encoder.rs new file mode 100644 index 0000000000..5065080324 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/encoder.rs @@ -0,0 +1,25 @@ +use bytes::BytesMut; +use std::io; + +/// Trait of helper objects to write out messages as bytes, for use with +/// `FramedWrite`. + +// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and +// there doesn't seem to be a way to un-deprecate the re-export. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + /// + /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>` + /// in the interest letting it return `Error`s directly. + type Error: From<io::Error>; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `item` into the byte buffer provided by `dst`. + /// The `dst` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; +} diff --git a/third_party/rust/tokio-io/src/codec/lines_codec.rs b/third_party/rust/tokio-io/src/codec/lines_codec.rs new file mode 100644 index 0000000000..818397fa50 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/lines_codec.rs @@ -0,0 +1,88 @@ +#![allow(deprecated)] + +use bytes::{BufMut, BytesMut}; +use codec::{Decoder, Encoder}; +use std::{io, str}; + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n') { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len() - 1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/mod.rs b/third_party/rust/tokio-io/src/codec/mod.rs new file mode 100644 index 0000000000..6636821750 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/mod.rs @@ -0,0 +1,378 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +// tokio_io::codec originally held all codec-related helpers. This is now intended to be in +// tokio_codec instead. However, for backward compatibility, this remains here. When the next major +// breaking change comes, `Encoder` and `Decoder` need to be moved to `tokio_codec`, and the rest +// of this module should be removed. + +#![doc(hidden)] +#![allow(deprecated)] + +mod bytes_codec; +mod decoder; +mod encoder; +mod lines_codec; + +pub use self::bytes_codec::BytesCodec; +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; +pub use self::lines_codec::LinesCodec; + +pub use framed::{Framed, FramedParts}; +pub use framed_read::FramedRead; +pub use framed_write::FramedWrite; + +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub mod length_delimited { + //! Frame a stream of bytes based on a length prefix + //! + //! Many protocols delimit their frames by prefacing frame data with a + //! frame head that specifies the length of the frame. The + //! `length_delimited` module provides utilities for handling the length + //! based framing. This allows the consumer to work with entire frames + //! without having to worry about buffering or other framing logic. + //! + //! # Getting started + //! + //! If implementing a protocol from scratch, using length delimited framing + //! is an easy way to get started. [`Framed::new()`](length_delimited::Framed::new) will adapt a + //! full-duplex byte stream with a length delimited framer using default + //! configuration values. + //! + //! ``` + //! use tokio_io::{AsyncRead, AsyncWrite}; + //! use tokio_io::codec::length_delimited; + //! + //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) + //! -> length_delimited::Framed<T> + //! { + //! length_delimited::Framed::new(io) + //! } + //! ``` + //! + //! The returned transport implements `Sink + Stream` for `BytesMut`. It + //! encodes the frame with a big-endian `u32` header denoting the frame + //! payload length: + //! + //! ```text + //! +----------+--------------------------------+ + //! | len: u32 | frame payload | + //! +----------+--------------------------------+ + //! ``` + //! + //! Specifically, given the following: + //! + //! ``` + //! # extern crate tokio_io; + //! # extern crate bytes; + //! # extern crate futures; + //! # + //! use tokio_io::{AsyncRead, AsyncWrite}; + //! use tokio_io::codec::length_delimited; + //! use bytes::BytesMut; + //! use futures::{Sink, Future}; + //! + //! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + //! let mut transport = length_delimited::Framed::new(io); + //! let frame = BytesMut::from("hello world"); + //! + //! transport.send(frame).wait().unwrap(); + //! } + //! # + //! # pub fn main() {} + //! ``` + //! + //! The encoded frame will look like this: + //! + //! ```text + //! +---- len: u32 ----+---- data ----+ + //! | \x00\x00\x00\x0b | hello world | + //! +------------------+--------------+ + //! ``` + //! + //! # Decoding + //! + //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], + //! such that each yielded [`BytesMut`] value contains the contents of an + //! entire frame. There are many configuration parameters enabling + //! [`FramedRead`] to handle a wide range of protocols. Here are some + //! examples that will cover the various options at a high level. + //! + //! ## Example 1 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! .num_skip(0) // Do not strip frame header + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! The value of the length field is 11 (`\x0B`) which represents the length + //! of the payload, `hello world`. By default, [`FramedRead`] assumes that + //! the length field represents the number of bytes that **follows** the + //! length field. Thus, the entire frame has a length of 13: 2 bytes for the + //! frame head + 11 bytes for the payload. + //! + //! ## Example 2 + //! + //! The following will parse a `u16` length field at offset 0, omitting the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! // `num_skip` is not needed, the default is to skip + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +--- Payload ---+ + //! | \x00\x0B | Hello world | --> | Hello world | + //! +----------+---------------+ +---------------+ + //! ``` + //! + //! This is similar to the first example, the only difference is that the + //! frame head is **not** included in the yielded `BytesMut` value. + //! + //! ## Example 3 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. In this case, the length field + //! **includes** the frame head length. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(-2) // size of head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! In most cases, the length field represents the length of the payload + //! only, as shown in the previous examples. However, in some protocols the + //! length field represents the length of the whole frame, including the + //! head. In such cases, we specify a negative `length_adjustment` to adjust + //! the value provided in the frame head to represent the payload length. + //! + //! ## Example 4 + //! + //! The following will parse a 3 byte length field at offset 0 in a 5 byte + //! frame head, including the frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(3) + //! .length_adjustment(2) // remaining head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! + //! DECODED + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! ``` + //! + //! A more advanced example that shows a case where there is extra frame + //! head data between the length field and the payload. In such cases, it is + //! usually desirable to include the frame head as part of the yielded + //! `BytesMut`. This lets consumers of the length delimited framer to + //! process the frame head as needed. + //! + //! The positive `length_adjustment` value lets `FramedRead` factor in the + //! additional head into the frame length calculation. + //! + //! ## Example 5 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(1) // length of hdr2 + //! .num_skip(3) // length of hdr1 + LEN + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0B | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! The length field is situated in the middle of the frame head. In this + //! case, the first byte in the frame head could be a version or some other + //! identifier that is not needed for processing. On the other hand, the + //! second half of the head is needed. + //! + //! `length_field_offset` indicates how many bytes to skip before starting + //! to read the length field. `length_adjustment` is the number of bytes to + //! skip starting at the end of the length field. In this case, it is the + //! second half of the head. + //! + //! ## Example 6 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. In this case, the length field **includes** the frame head + //! length. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(-3) // length of hdr1 + LEN, negative + //! .num_skip(3) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0F | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! Similar to the example above, the difference is that the length field + //! represents the length of the entire frame instead of just the payload. + //! The length of `hdr1` and `len` must be counted in `length_adjustment`. + //! Note that the length of `hdr2` does **not** need to be explicitly set + //! anywhere because it already is factored into the total frame length that + //! is read from the byte stream. + //! + //! # Encoding + //! + //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], + //! such that each submitted [`BytesMut`] is prefaced by a length field. + //! There are fewer configuration options than [`FramedRead`]. Given + //! protocols that have more complex frame heads, an encoder should probably + //! be written by hand using [`Encoder`]. + //! + //! Here is a simple example, given a `FramedWrite` with the following + //! configuration: + //! + //! ``` + //! # extern crate tokio_io; + //! # extern crate bytes; + //! # use tokio_io::AsyncWrite; + //! # use tokio_io::codec::length_delimited; + //! # use bytes::BytesMut; + //! # fn write_frame<T: AsyncWrite>(io: T) { + //! # let _: length_delimited::FramedWrite<T, BytesMut> = + //! length_delimited::Builder::new() + //! .length_field_length(2) + //! .new_write(io); + //! # } + //! # pub fn main() {} + //! ``` + //! + //! A payload of `hello world` will be encoded as: + //! + //! ```text + //! +- len: u16 -+---- data ----+ + //! | \x00\x0b | hello world | + //! +------------+--------------+ + //! ``` + //! + //! [`FramedRead`]: struct.FramedRead.html + //! [`FramedWrite`]: struct.FramedWrite.html + //! [`AsyncRead`]: ../../trait.AsyncRead.html + //! [`AsyncWrite`]: ../../trait.AsyncWrite.html + //! [`Encoder`]: ../trait.Encoder.html + //! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html + + pub use length_delimited::*; +} diff --git a/third_party/rust/tokio-io/src/framed.rs b/third_party/rust/tokio-io/src/framed.rs new file mode 100644 index 0000000000..aea7b5468c --- /dev/null +++ b/third_party/rust/tokio-io/src/framed.rs @@ -0,0 +1,248 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read, Write}; + +use codec::{Decoder, Encoder}; +use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Poll, Sink, StartSend, Stream}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Fuse<T, U>(pub T, pub U); + +pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U> +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U> { + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), + parts.readbuf, + ), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T> { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + FramedParts { + inner: inner.0, + readbuf: readbuf, + writebuf: writebuf, + } + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data, and also the current codec state. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + /// + /// Note that this function will be removed once the codec has been + /// integrated into `FramedParts` in a new version (see + /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)). + pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + ( + FramedParts { + inner: inner.0, + readbuf: readbuf, + writebuf: writebuf, + }, + inner.1, + ) + } +} + +impl<T, U> Stream for Framed<T, U> +where + T: AsyncRead, + U: Decoder, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, U> Sink for Framed<T, U> +where + T: AsyncWrite, + U: Encoder, + U::Error: From<io::Error>, +{ + type SinkItem = U::Item; + type SinkError = U::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.get_mut().start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().close() + } +} + +impl<T, U> fmt::Debug for Framed<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl<T: Read, U> Read for Fuse<T, U> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.0.read(dst) + } +} + +impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } +} + +impl<T: Write, U> Write for Fuse<T, U> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} + +impl<T, U: Decoder> Decoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl<T, U: Encoder> Encoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts<T> { + /// The inner transport used to read bytes to and write bytes to + pub inner: T, + /// The buffer with read but unprocessed data. + pub readbuf: BytesMut, + /// A buffer with unprocessed data which are not written yet. + pub writebuf: BytesMut, +} diff --git a/third_party/rust/tokio-io/src/framed_read.rs b/third_party/rust/tokio-io/src/framed_read.rs new file mode 100644 index 0000000000..d675084b01 --- /dev/null +++ b/third_party/rust/tokio-io/src/framed_read.rs @@ -0,0 +1,220 @@ +#![allow(deprecated)] + +use std::fmt; + +use codec::Decoder; +use framed::Fuse; +use AsyncRead; + +use bytes::BytesMut; +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A `Stream` of messages decoded from an `AsyncRead`. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedRead<T, D> { + inner: FramedRead2<Fuse<T, D>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedRead2<T> { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl<T, D> FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl<T, D> FramedRead<T, D> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl<T, D> Stream for FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + type Item = D::Item; + type Error = D::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, D> Sink for FramedRead<T, D> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.inner.0.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.close() + } +} + +impl<T, D> fmt::Debug for FramedRead<T, D> +where + T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub fn framed_read2<T>(inner: T) -> FramedRead2<T> { + FramedRead2 { + inner: inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner: inner, + eof: false, + is_readable: buf.len() > 0, + buffer: buf, + } +} + +impl<T> FramedRead2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Stream for FramedRead2<T> +where + T: AsyncRead + Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if self.is_readable { + if self.eof { + let frame = self.inner.decode_eof(&mut self.buffer)?; + return Ok(Async::Ready(frame)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = self.inner.decode(&mut self.buffer)? { + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.buffer.reserve(1); + if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { + self.eof = true; + } + + self.is_readable = true; + } + } +} diff --git a/third_party/rust/tokio-io/src/framed_write.rs b/third_party/rust/tokio-io/src/framed_write.rs new file mode 100644 index 0000000000..483b9fac8d --- /dev/null +++ b/third_party/rust/tokio-io/src/framed_write.rs @@ -0,0 +1,250 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read}; + +use codec::{Decoder, Encoder}; +use framed::Fuse; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +/// A `Sink` of frames encoded to an `AsyncWrite`. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite<T, E> { + inner: FramedWrite2<Fuse<T, E>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite2<T> { + inner: T, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl<T, E> FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl<T, E> FramedWrite<T, E> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +impl<T, E> Sink for FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + type SinkItem = E::Item; + type SinkError = E::Error; + + fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.inner.close()?) + } +} + +impl<T, D> Stream for FramedWrite<T, D> +where + T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.inner.0.poll() + } +} + +impl<T, U> fmt::Debug for FramedWrite<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedWrite2 ===== + +pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> { + FramedWrite2 { + inner: inner, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedWrite2 { + inner: inner, + buffer: buf, + } +} + +impl<T> FramedWrite2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Sink for FramedWrite2<T> +where + T: AsyncWrite + Encoder, +{ + type SinkItem = T::Item; + type SinkError = T::Error; + + fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> { + // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's + // *still* over 8KiB, then apply backpressure (reject the send). + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + self.poll_complete()?; + + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.inner.encode(item, &mut self.buffer)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + trace!("flushing framed transport"); + + while !self.buffer.is_empty() { + trace!("writing; remaining={}", self.buffer.len()); + + let n = try_ready!(self.inner.poll_write(&self.buffer)); + + if n == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to + write frame to transport", + ) + .into()); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.buffer.split_to(n); + } + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + trace!("framed transport flushed"); + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_complete()); + Ok(self.inner.shutdown()?) + } +} + +impl<T: Decoder> Decoder for FramedWrite2<T> { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode(src) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode_eof(src) + } +} + +impl<T: Read> Read for FramedWrite2<T> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner.read(dst) + } +} + +impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/third_party/rust/tokio-io/src/io/copy.rs b/third_party/rust/tokio-io/src/io/copy.rs new file mode 100644 index 0000000000..e8a1dac95d --- /dev/null +++ b/third_party/rust/tokio-io/src/io/copy.rs @@ -0,0 +1,100 @@ +use std::io; + +use futures::{Future, Poll}; + +use {AsyncRead, AsyncWrite}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the [`copy`] function, this future will resolve to the number of +/// bytes copied or an error if one happens. +/// +/// [`copy`]: fn.copy.html +#[derive(Debug)] +pub struct Copy<R, W> { + reader: Option<R>, + read_done: bool, + writer: Option<W>, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + Copy { + reader: Some(reader), + read_done: false, + writer: Some(writer), + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + type Item = (u64, R, W); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(u64, R, W), io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let reader = self.reader.as_mut().unwrap(); + let n = try_ready!(reader.poll_read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let writer = self.writer.as_mut().unwrap(); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); + if i == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "write zero byte into writer", + )); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush()); + let reader = self.reader.take().unwrap(); + let writer = self.writer.take().unwrap(); + return Ok((self.amt, reader, writer).into()); + } + } + } +} diff --git a/third_party/rust/tokio-io/src/io/flush.rs b/third_party/rust/tokio-io/src/io/flush.rs new file mode 100644 index 0000000000..febc7ee1b1 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/flush.rs @@ -0,0 +1,43 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the [`flush`] function. +/// +/// [`flush`]: fn.flush.html +#[derive(Debug)] +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> +where + A: AsyncWrite, +{ + Flush { a: Some(a) } +} + +impl<A> Future for Flush<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().poll_flush()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/mod.rs b/third_party/rust/tokio-io/src/io/mod.rs new file mode 100644 index 0000000000..763cfaee7f --- /dev/null +++ b/third_party/rust/tokio-io/src/io/mod.rs @@ -0,0 +1,32 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +mod copy; +mod flush; +mod read; +mod read_exact; +mod read_to_end; +mod read_until; +mod shutdown; +mod write_all; + +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read::{read, Read}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::read_until::{read_until, ReadUntil}; +pub use self::shutdown::{shutdown, Shutdown}; +pub use self::write_all::{write_all, WriteAll}; +pub use allow_std::AllowStdIo; +pub use lines::{lines, Lines}; +pub use split::{ReadHalf, WriteHalf}; +pub use window::Window; diff --git a/third_party/rust/tokio-io/src/io/read.rs b/third_party/rust/tokio-io/src/io/read.rs new file mode 100644 index 0000000000..632cef4d28 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read.rs @@ -0,0 +1,60 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +#[derive(Debug)] +enum State<R, T> { + Pending { rd: R, buf: T }, + Empty, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read<R, T>(rd: R, buf: T) -> Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + Read { + state: State::Pending { rd: rd, buf: buf }, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read<R, T> { + state: State<R, T>, +} + +impl<R, T> Future for Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (R, T, usize); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(R, T, usize), io::Error> { + let nread = match self.state { + State::Pending { + ref mut rd, + ref mut buf, + } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { rd, buf } => Ok((rd, buf, nread).into()), + State::Empty => panic!("invalid internal state"), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_exact.rs b/third_party/rust/tokio-io/src/io/read_exact.rs new file mode 100644 index 0000000000..3b98621ae4 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_exact.rs @@ -0,0 +1,85 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read exactly enough bytes to fill +/// a buffer. +/// +/// Created by the [`read_exact`] function. +/// +/// [`read_exact`]: fn.read_exact.html +#[derive(Debug)] +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Reading { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + ref mut pos, + } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_ready!(a.poll_read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Err(eof()); + } + } + } + State::Empty => panic!("poll a ReadExact after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_to_end.rs b/third_party/rust/tokio-io/src/io/read_to_end.rs new file mode 100644 index 0000000000..296af6e304 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_to_end.rs @@ -0,0 +1,66 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the [`read_to_end`] function. +/// +/// [`read_to_end`]: fn.read_to_end.html +#[derive(Debug)] +pub struct ReadToEnd<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success both the object and the buffer +/// will be returned, with all data read from the stream appended to the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> +where + A: AsyncRead, +{ + ReadToEnd { + state: State::Reading { a: a, buf: buf }, + } +} + +impl<A> Future for ReadToEnd<A> +where + A: AsyncRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + } + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_until.rs b/third_party/rust/tokio-io/src/io/read_until.rs new file mode 100644 index 0000000000..d0be4c9448 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_until.rs @@ -0,0 +1,76 @@ +use std::io::{self, BufRead}; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the contents of a stream into a +/// vector until the delimiter is reached. +/// +/// Created by the [`read_until`] function. +/// +/// [`read_until`]: fn.read_until.html +#[derive(Debug)] +pub struct ReadUntil<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, byte: u8, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided until the delimiter `byte` is reached. +/// This method is the async equivalent to [`BufRead::read_until`]. +/// +/// In case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all bytes up to, and including, the delimiter +/// (if found). +/// +/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until +pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + ReadUntil { + state: State::Reading { + a: a, + byte: byte, + buf: buf, + }, + } +} + +impl<A> Future for ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + byte, + ref mut buf, + } => { + // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter. + // and just return it, as we are finished. + // If we hit "would block" then all the read data so far + // is in our buffer, and otherwise we propagate errors. + try_nb!(a.read_until(byte, buf)); + } + State::Empty => panic!("poll ReadUntil after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, byte: _, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/shutdown.rs b/third_party/rust/tokio-io/src/io/shutdown.rs new file mode 100644 index 0000000000..d963a813ad --- /dev/null +++ b/third_party/rust/tokio-io/src/io/shutdown.rs @@ -0,0 +1,44 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully shutdown an I/O object. +/// +/// Resolves to the underlying I/O object once the shutdown operation is +/// complete. +/// +/// Created by the [`shutdown`] function. +/// +/// [`shutdown`]: fn.shutdown.html +#[derive(Debug)] +pub struct Shutdown<A> { + a: Option<A>, +} + +/// Creates a future which will entirely shutdown an I/O object and then yield +/// the object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`, +/// scheduling a retry if `WouldBlock` is seen along the way. +pub fn shutdown<A>(a: A) -> Shutdown<A> +where + A: AsyncWrite, +{ + Shutdown { a: Some(a) } +} + +impl<A> Future for Shutdown<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().shutdown()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/write_all.rs b/third_party/rust/tokio-io/src/io/write_all.rs new file mode 100644 index 0000000000..ba8af4a222 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/write_all.rs @@ -0,0 +1,88 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncWrite; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +#[derive(Debug)] +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Writing { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl<A, T> Future for WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { + ref mut a, + ref buf, + ref mut pos, + } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_ready!(a.poll_write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Err(zero_write()); + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/length_delimited.rs b/third_party/rust/tokio-io/src/length_delimited.rs new file mode 100644 index 0000000000..c211c95356 --- /dev/null +++ b/third_party/rust/tokio-io/src/length_delimited.rs @@ -0,0 +1,943 @@ +#![allow(deprecated)] + +use {codec, AsyncRead, AsyncWrite}; + +use bytes::buf::Chain; +use bytes::{Buf, BufMut, BytesMut, IntoBuf}; + +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +use std::error::Error as StdError; +use std::io::{self, Cursor}; +use std::{cmp, fmt}; + +/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values. +/// +/// `Builder` enables constructing configured length delimited framers. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option<usize>, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// Adapts a byte stream into a unified `Stream` and `Sink` that works over +/// entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Framed<T, B: IntoBuf = BytesMut> { + inner: FramedRead<FramedWrite<T, B>>, +} + +/// Adapts a byte stream to a `Stream` yielding entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +#[derive(Debug)] +pub struct FramedRead<T> { + inner: codec::FramedRead<T, Decoder>, +} + +/// An error when the number of bytes read is more than max frame length. +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FrameTooBig { + _priv: (), +} + +#[derive(Debug)] +struct Decoder { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +/// Adapts a byte stream to a `Sink` accepting entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite<T, B: IntoBuf = BytesMut> { + // I/O type + inner: T, + + // Configuration values + builder: Builder, + + // Current frame being written + frame: Option<Chain<Cursor<BytesMut>, B::Buf>>, +} + +// ===== impl Framed ===== + +impl<T: AsyncRead + AsyncWrite, B: IntoBuf> Framed<T, B> { + /// Creates a new `Framed` with default configuration values. + pub fn new(inner: T) -> Framed<T, B> { + Builder::new().new_framed(inner) + } +} + +impl<T, B: IntoBuf> Framed<T, B> { + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref().get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().get_mut() + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner() + } +} + +impl<T: AsyncRead, B: IntoBuf> Stream for Framed<T, B> { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> { + self.inner.poll() + } +} + +impl<T: AsyncWrite, B: IntoBuf> Sink for Framed<T, B> { + type SinkItem = B; + type SinkError = io::Error; + + fn start_send(&mut self, item: B) -> StartSend<B, io::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), io::Error> { + self.inner.close() + } +} + +impl<T, B: IntoBuf> fmt::Debug for Framed<T, B> +where + T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl FramedRead ===== + +impl<T: AsyncRead> FramedRead<T> { + /// Creates a new `FramedRead` with default configuration values. + pub fn new(inner: T) -> FramedRead<T> { + Builder::new().new_read(inner) + } +} + +impl<T> FramedRead<T> { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.inner.decoder().builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.inner.decoder_mut().builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl<T: AsyncRead> Stream for FramedRead<T> { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> { + self.inner.poll() + } +} + +impl<T: Sink> Sink for FramedRead<T> { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), T::SinkError> { + self.inner.close() + } +} + +impl<T: io::Write> io::Write for FramedRead<T> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.inner.get_mut().write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.get_mut().flush() + } +} + +impl<T: AsyncWrite> AsyncWrite for FramedRead<T> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.get_mut().shutdown() + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + self.inner.get_mut().write_buf(buf) + } +} + +// ===== impl Decoder ====== + +impl Decoder { + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint_be(field_len) + } else { + src.get_uint_le(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + FrameTooBig { _priv: () }, + )); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + )); + } + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl codec::Decoder for Decoder { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + let n = match self.state { + DecodeState::Head => match self.decode_head(src)? { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + }, + DecodeState::Data(n) => n, + }; + + match self.decode_data(n, src)? { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +// ===== impl FramedWrite ===== + +impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> { + /// Creates a new `FramedWrite` with default configuration values. + pub fn new(inner: T) -> FramedWrite<T, B> { + Builder::new().new_write(inner) + } +} + +impl<T, B: IntoBuf> FramedWrite<T, B> { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will write to the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is encoded. In other + /// words, if a frame is currently in process of being encoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> { + // If there is a buffered frame, try to write it to `T` + fn do_write(&mut self) -> Poll<(), io::Error> { + if self.frame.is_none() { + return Ok(Async::Ready(())); + } + + loop { + let frame = self.frame.as_mut().unwrap(); + if try_ready!(self.inner.write_buf(frame)) == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )); + } + + if !frame.has_remaining() { + break; + } + } + + self.frame = None; + + Ok(Async::Ready(())) + } + + fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> { + let mut head = BytesMut::with_capacity(8); + let n = buf.remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + FrameTooBig { _priv: () }, + )); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + // Error handling + let n = match n { + Some(n) => n, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + )); + } + }; + + if self.builder.length_field_is_big_endian { + head.put_uint_be(n as u64, self.builder.length_field_len); + } else { + head.put_uint_le(n as u64, self.builder.length_field_len); + } + + debug_assert!(self.frame.is_none()); + + self.frame = Some(head.into_buf().chain(buf)); + + Ok(()) + } +} + +impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> { + type SinkItem = B; + type SinkError = io::Error; + + fn start_send(&mut self, item: B) -> StartSend<B, io::Error> { + if !self.do_write()?.is_ready() { + return Ok(AsyncSink::NotReady(item)); + } + + self.set_frame(item.into_buf())?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + // Write any buffered frame to T + try_ready!(self.do_write()); + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), io::Error> { + try_ready!(self.poll_complete()); + self.inner.shutdown() + } +} + +impl<T: Stream, B: IntoBuf> Stream for FramedWrite<T, B> { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { + self.inner.poll() + } +} + +impl<T: io::Read, B: IntoBuf> io::Read for FramedWrite<T, B> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.get_mut().read(dst) + } +} + +impl<T: AsyncRead, U: IntoBuf> AsyncRead for FramedWrite<T, U> { + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + self.get_mut().read_buf(buf) + } + + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T, B: IntoBuf> fmt::Debug for FramedWrite<T, B> +where + T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner) + .field("builder", &self.builder) + .field("frame", &self.frame) + .finish() + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited framer builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Read the length field as a native endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .native_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn native_endian(&mut self) -> &mut Self { + if cfg!(target_endian = "big") { + self.big_endian() + } else { + self.little_endian() + } + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T> + where + T: AsyncRead, + { + FramedRead { + inner: codec::FramedRead::new( + upstream, + Decoder { + builder: *self, + state: DecodeState::Head, + }, + ), + } + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_io; + /// # extern crate bytes; + /// # use tokio_io::AsyncWrite; + /// # use tokio_io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// # let _: length_delimited::FramedWrite<T, BytesMut> = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T, B>(&self, inner: T) -> FramedWrite<T, B> + where + T: AsyncWrite, + B: IntoBuf, + { + FramedWrite { + inner: inner, + builder: *self, + frame: None, + } + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_io; + /// # extern crate bytes; + /// # use tokio_io::{AsyncRead, AsyncWrite}; + /// # use tokio_io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _: length_delimited::Framed<T, BytesMut> = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T, B>(&self, inner: T) -> Framed<T, B> + where + T: AsyncRead + AsyncWrite, + B: IntoBuf, + { + let inner = self.new_read(self.new_write(inner)); + Framed { inner: inner } + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip + .unwrap_or(self.length_field_offset + self.length_field_len) + } +} + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig").finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/third_party/rust/tokio-io/src/lib.rs b/third_party/rust/tokio-io/src/lib.rs new file mode 100644 index 0000000000..6d9fe22927 --- /dev/null +++ b/third_party/rust/tokio-io/src/lib.rs @@ -0,0 +1,75 @@ +#![deny(missing_docs, missing_debug_implementations)] +#![doc(html_root_url = "https://docs.rs/tokio-io/0.1.13")] + +//! Core I/O traits and combinators when working with Tokio. +//! +//! > **Note:** This crate has been **deprecated in tokio 0.2.x** and has been +//! > moved into [`tokio::io`]. +//! +//! [`tokio::io`]: https://docs.rs/tokio/latest/tokio/io/index.html +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +#[macro_use] +extern crate log; + +#[macro_use] +extern crate futures; +extern crate bytes; + +use std::io as std_io; + +use futures::{Future, Stream}; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture<T> = Box<dyn Future<Item = T, Error = std_io::Error> + Send>; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream<T> = Box<dyn Stream<Item = T, Error = std_io::Error> + Send>; + +/// A convenience macro for working with `io::Result<T>` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => { + match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return Ok(::futures::Async::NotReady); + } + Err(e) => return Err(e.into()), + } + }; +} + +pub mod codec; +pub mod io; + +pub mod _tokio_codec; +mod allow_std; +mod async_read; +mod async_write; +mod framed; +mod framed_read; +mod framed_write; +mod length_delimited; +mod lines; +mod split; +mod window; + +pub use self::async_read::AsyncRead; +pub use self::async_write::AsyncWrite; + +fn _assert_objects() { + fn _assert<T>() {} + _assert::<Box<dyn AsyncRead>>(); + _assert::<Box<dyn AsyncWrite>>(); +} diff --git a/third_party/rust/tokio-io/src/lines.rs b/third_party/rust/tokio-io/src/lines.rs new file mode 100644 index 0000000000..8e59ff8fa2 --- /dev/null +++ b/third_party/rust/tokio-io/src/lines.rs @@ -0,0 +1,62 @@ +use std::io::{self, BufRead}; +use std::mem; + +use futures::{Poll, Stream}; + +use AsyncRead; + +/// Combinator created by the top-level `lines` method which is a stream over +/// the lines of text on an I/O object. +#[derive(Debug)] +pub struct Lines<A> { + io: A, + line: String, +} + +/// Creates a new stream from the I/O object given representing the lines of +/// input that are found on `A`. +/// +/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of +/// lines that the object contains. The returned stream will reach its end once +/// `a` reaches EOF. +pub fn lines<A>(a: A) -> Lines<A> +where + A: AsyncRead + BufRead, +{ + Lines { + io: a, + line: String::new(), + } +} + +impl<A> Lines<A> { + /// Returns the underlying I/O object. + /// + /// Note that this may lose data already read into internal buffers. It's + /// recommended to only call this once the stream has reached its end. + pub fn into_inner(self) -> A { + self.io + } +} + +impl<A> Stream for Lines<A> +where + A: AsyncRead + BufRead, +{ + type Item = String; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<String>, io::Error> { + let n = try_nb!(self.io.read_line(&mut self.line)); + if n == 0 && self.line.len() == 0 { + return Ok(None.into()); + } + if self.line.ends_with("\n") { + self.line.pop(); + if self.line.ends_with("\r") { + self.line.pop(); + } + } + Ok(Some(mem::replace(&mut self.line, String::new())).into()) + } +} diff --git a/third_party/rust/tokio-io/src/split.rs b/third_party/rust/tokio-io/src/split.rs new file mode 100644 index 0000000000..ef8f990c8c --- /dev/null +++ b/third_party/rust/tokio-io/src/split.rs @@ -0,0 +1,247 @@ +use std::io::{self, Read, Write}; + +use bytes::{Buf, BufMut}; +use futures::sync::BiLock; +use futures::{Async, Poll}; + +use {AsyncRead, AsyncWrite}; + +/// The readable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct ReadHalf<T> { + handle: BiLock<T>, +} + +impl<T: AsyncRead + AsyncWrite> ReadHalf<T> { + /// Reunite with a previously split `WriteHalf`. + /// + /// # Panics + /// + /// If this `ReadHalf` and the given `WriteHalf` do not originate from + /// the same `AsyncRead::split` operation this method will panic. + pub fn unsplit(self, w: WriteHalf<T>) -> T { + if let Ok(x) = self.handle.reunite(w.handle) { + x + } else { + panic!("Unrelated `WriteHalf` passed to `ReadHalf::unsplit`.") + } + } +} + +/// The writable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct WriteHalf<T> { + handle: BiLock<T>, +} + +impl<T: AsyncRead + AsyncWrite> WriteHalf<T> { + /// Reunite with a previously split `ReadHalf`. + /// + /// # panics + /// + /// If this `WriteHalf` and the given `ReadHalf` do not originate from + /// the same `AsyncRead::split` operation this method will panic. + pub fn unsplit(self, r: ReadHalf<T>) -> T { + if let Ok(x) = self.handle.reunite(r.handle) { + x + } else { + panic!("Unrelated `ReadHalf` passed to `WriteHalf::unsplit`.") + } + } +} + +pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { + let (a, b) = BiLock::new(t); + (ReadHalf { handle: a }, WriteHalf { handle: b }) +} + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +impl<T: AsyncRead> Read for ReadHalf<T> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.read(buf), + Async::NotReady => Err(would_block()), + } + } +} + +impl<T: AsyncRead> AsyncRead for ReadHalf<T> { + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.read_buf(buf) + } +} + +impl<T: AsyncWrite> Write for WriteHalf<T> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.write(buf), + Async::NotReady => Err(would_block()), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.flush(), + Async::NotReady => Err(would_block()), + } + } +} + +impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.shutdown() + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> + where + Self: Sized, + { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.write_buf(buf) + } +} + +fn wrap_as_io<T>(t: Async<T>) -> Result<Async<T>, io::Error> { + Ok(t) +} + +#[cfg(test)] +mod tests { + extern crate tokio_current_thread; + + use super::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; + use bytes::{BytesMut, IntoBuf}; + use futures::sync::BiLock; + use futures::{future::lazy, future::ok, Async, Poll}; + + use std::io::{self, Read, Write}; + + struct RW; + + impl Read for RW { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Ok(1) + } + } + + impl AsyncRead for RW {} + + impl Write for RW { + fn write(&mut self, _: &[u8]) -> io::Result<usize> { + Ok(1) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl AsyncWrite for RW { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } + } + + #[test] + fn split_readhalf_translate_wouldblock_to_not_ready() { + tokio_current_thread::block_on_all(lazy(move || { + let rw = RW {}; + let (a, b) = BiLock::new(rw); + let mut rx = ReadHalf { handle: a }; + + let mut buf = BytesMut::with_capacity(64); + + // First read is uncontended, should go through. + assert!(rx.read_buf(&mut buf).unwrap().is_ready()); + + // Take lock from write side. + let lock = b.poll_lock(); + + // Second read should be NotReady. + assert!(!rx.read_buf(&mut buf).unwrap().is_ready()); + + drop(lock); + + // Back to uncontended. + assert!(rx.read_buf(&mut buf).unwrap().is_ready()); + + ok::<(), ()>(()) + })) + .unwrap(); + } + + #[test] + fn split_writehalf_translate_wouldblock_to_not_ready() { + tokio_current_thread::block_on_all(lazy(move || { + let rw = RW {}; + let (a, b) = BiLock::new(rw); + let mut tx = WriteHalf { handle: a }; + + let bufmut = BytesMut::with_capacity(64); + let mut buf = bufmut.into_buf(); + + // First write is uncontended, should go through. + assert!(tx.write_buf(&mut buf).unwrap().is_ready()); + + // Take lock from read side. + let lock = b.poll_lock(); + + // Second write should be NotReady. + assert!(!tx.write_buf(&mut buf).unwrap().is_ready()); + + drop(lock); + + // Back to uncontended. + assert!(tx.write_buf(&mut buf).unwrap().is_ready()); + + ok::<(), ()>(()) + })) + .unwrap(); + } + + #[test] + fn unsplit_ok() { + let (r, w) = RW.split(); + r.unsplit(w); + + let (r, w) = RW.split(); + w.unsplit(r); + } + + #[test] + #[should_panic] + fn unsplit_err1() { + let (r, _) = RW.split(); + let (_, w) = RW.split(); + r.unsplit(w); + } + + #[test] + #[should_panic] + fn unsplit_err2() { + let (_, w) = RW.split(); + let (r, _) = RW.split(); + r.unsplit(w); + } + + #[test] + #[should_panic] + fn unsplit_err3() { + let (_, w) = RW.split(); + let (r, _) = RW.split(); + w.unsplit(r); + } + + #[test] + #[should_panic] + fn unsplit_err4() { + let (r, _) = RW.split(); + let (_, w) = RW.split(); + w.unsplit(r); + } +} diff --git a/third_party/rust/tokio-io/src/window.rs b/third_party/rust/tokio-io/src/window.rs new file mode 100644 index 0000000000..4ded9ad403 --- /dev/null +++ b/third_party/rust/tokio-io/src/window.rs @@ -0,0 +1,117 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +#[derive(Debug)] +pub struct Window<T> { + inner: T, + range: ops::Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window<T> { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window<T> { + assert!(start <= self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes before the `start` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window<T> { + assert!(end <= self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} diff --git a/third_party/rust/tokio-io/tests/async_read.rs b/third_party/rust/tokio-io/tests/async_read.rs new file mode 100644 index 0000000000..604e99fd45 --- /dev/null +++ b/third_party/rust/tokio-io/tests/async_read.rs @@ -0,0 +1,149 @@ +extern crate bytes; +extern crate futures; +extern crate tokio_io; + +use bytes::{BufMut, BytesMut}; +use futures::Async; +use tokio_io::AsyncRead; + +use std::io::{self, Read}; + +#[test] +fn read_buf_success() { + struct R; + + impl Read for R { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + buf[0..11].copy_from_slice(b"hello world"); + Ok(11) + } + } + + impl AsyncRead for R {} + + let mut buf = BytesMut::with_capacity(65); + + let n = match R.read_buf(&mut buf).unwrap() { + Async::Ready(n) => n, + _ => panic!(), + }; + + assert_eq!(11, n); + assert_eq!(buf[..], b"hello world"[..]); +} + +#[test] +fn read_buf_error() { + struct R; + + impl Read for R { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Err(io::Error::new(io::ErrorKind::Other, "other")) + } + } + + impl AsyncRead for R {} + + let mut buf = BytesMut::with_capacity(65); + + let err = R.read_buf(&mut buf).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::Other); +} + +#[test] +fn read_buf_no_capacity() { + struct R; + + impl Read for R { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + unimplemented!(); + } + } + + impl AsyncRead for R {} + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + buf.put(&[0; 64][..]); + + let n = match R.read_buf(&mut buf).unwrap() { + Async::Ready(n) => n, + _ => panic!(), + }; + + assert_eq!(0, n); +} + +#[test] +fn read_buf_no_uninitialized() { + struct R; + + impl Read for R { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + for b in buf { + assert_eq!(0, *b); + } + + Ok(0) + } + } + + impl AsyncRead for R {} + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + + let n = match R.read_buf(&mut buf).unwrap() { + Async::Ready(n) => n, + _ => panic!(), + }; + + assert_eq!(0, n); +} + +#[test] +fn read_buf_uninitialized_ok() { + struct R; + + impl Read for R { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + assert_eq!(buf[0..11], b"hello world"[..]); + Ok(0) + } + } + + impl AsyncRead for R { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + } + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + unsafe { + buf.bytes_mut()[0..11].copy_from_slice(b"hello world"); + } + + let n = match R.read_buf(&mut buf).unwrap() { + Async::Ready(n) => n, + _ => panic!(), + }; + + assert_eq!(0, n); +} + +#[test] +fn read_buf_translate_wouldblock_to_not_ready() { + struct R; + + impl Read for R { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Err(io::Error::new(io::ErrorKind::WouldBlock, "")) + } + } + + impl AsyncRead for R {} + + let mut buf = BytesMut::with_capacity(65); + assert!(!R.read_buf(&mut buf).unwrap().is_ready()); +} diff --git a/third_party/rust/tokio-io/tests/length_delimited.rs b/third_party/rust/tokio-io/tests/length_delimited.rs new file mode 100644 index 0000000000..c99bbf089a --- /dev/null +++ b/third_party/rust/tokio-io/tests/length_delimited.rs @@ -0,0 +1,553 @@ +// This file is testing deprecated code. +#![allow(deprecated)] + +extern crate futures; +extern crate tokio_io; + +use tokio_io::codec::length_delimited::*; +use tokio_io::{AsyncRead, AsyncWrite}; + +use futures::Async::*; +use futures::{Poll, Sink, Stream}; + +use std::collections::VecDeque; +use std::io; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +#[test] +fn read_empty_io_yields_nothing() { + let mut io = FramedRead::new(mock!()); + + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_little_endian() { + let mut io = Builder::new().little_endian().new_read(mock! { + Ok(b"\x09\x00\x00\x00abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_one_packet_native_endian() { + let data = if cfg!(target_endian = "big") { + b"\x00\x00\x00\x09abcdefghi" + } else { + b"\x09\x00\x00\x00abcdefghi" + }; + let mut io = Builder::new().native_endian().new_read(mock! { + Ok(data[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + data.extend_from_slice(b"\x00\x00\x00\x03123"); + data.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let mut io = FramedRead::new(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Ok(b"\x00\x09abc"[..].into()), + Ok(b"defghi"[..].into()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_multi_frame_multi_packet_wait() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09abc"[..].into()), + Err(would_block()), + Ok(b"defghi"[..].into()), + Err(would_block()), + Ok(b"\x00\x00\x00\x0312"[..].into()), + Err(would_block()), + Ok(b"3\x00\x00\x00\x0bhello world"[..].into()), + Err(would_block()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_incomplete_head() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00"[..].into()), + }); + + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_head_multi() { + let mut io = FramedRead::new(mock! { + Err(would_block()), + Ok(b"\x00"[..].into()), + Err(would_block()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_incomplete_payload() { + let mut io = FramedRead::new(mock! { + Ok(b"\x00\x00\x00\x09ab"[..].into()), + Err(would_block()), + Ok(b"cd"[..].into()), + Err(would_block()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + assert_eq!(io.poll().unwrap(), NotReady); + assert!(io.poll().is_err()); +} + +#[test] +fn read_max_frame_len() { + let mut io = Builder::new().max_frame_length(5).new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_at_rest() { + let mut io = Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + io.set_max_frame_length(5); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_update_max_frame_len_in_flight() { + let mut io = Builder::new().new_read(mock! { + Ok(b"\x00\x00\x00\x09abcd"[..].into()), + Err(would_block()), + Ok(b"efghi"[..].into()), + Ok(b"\x00\x00\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), NotReady); + io.set_max_frame_length(5); + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap_err().kind(), io::ErrorKind::InvalidData); +} + +#[test] +fn read_one_byte_length_field() { + let mut io = Builder::new().length_field_length(1).new_read(mock! { + Ok(b"\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_header_offset() { + let mut io = Builder::new() + .length_field_length(2) + .length_field_offset(4) + .new_read(mock! { + Ok(b"zzzz\x00\x09abcdefghi"[..].into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_skip_none_adjusted() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"xx\x00\x09abcdefghi"); + data.extend_from_slice(b"yy\x00\x03123"); + data.extend_from_slice(b"zz\x00\x0bhello world"); + + let mut io = Builder::new() + .length_field_length(2) + .length_field_offset(2) + .num_skip(0) + .length_adjustment(4) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"xx\x00\x09abcdefghi"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(Some(b"yy\x00\x03123"[..].into()))); + assert_eq!( + io.poll().unwrap(), + Ready(Some(b"zz\x00\x0bhello world"[..].into())) + ); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn read_single_multi_frame_one_packet_length_includes_head() { + let mut data: Vec<u8> = vec![]; + data.extend_from_slice(b"\x00\x0babcdefghi"); + data.extend_from_slice(b"\x00\x05123"); + data.extend_from_slice(b"\x00\x0dhello world"); + + let mut io = Builder::new() + .length_field_length(2) + .length_adjustment(-2) + .new_read(mock! { + Ok(data.into()), + }); + + assert_eq!(io.poll().unwrap(), Ready(Some(b"abcdefghi"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"123"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(Some(b"hello world"[..].into()))); + assert_eq!(io.poll().unwrap(), Ready(None)); +} + +#[test] +fn write_single_frame_length_adjusted() { + let mut io = Builder::new().length_adjustment(-2).new_write(mock! { + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_nothing_yields_nothing() { + let mut io: FramedWrite<_, &'static [u8]> = FramedWrite::new(mock!()); + assert!(io.poll_complete().unwrap().is_ready()); +} + +#[test] +fn write_single_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_one_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.start_send("123").unwrap().is_ready()); + assert!(io.start_send("hello world").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_multi_frame_multi_packet() { + let mut io = FramedWrite::new(mock! { + Ok(b"\x00\x00\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x03"[..].into()), + Ok(b"123"[..].into()), + Ok(Flush), + Ok(b"\x00\x00\x00\x0b"[..].into()), + Ok(b"hello world"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send("123").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.start_send("hello world").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_would_block() { + let mut io = FramedWrite::new(mock! { + Err(would_block()), + Ok(b"\x00\x00"[..].into()), + Err(would_block()), + Ok(b"\x00\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_little_endian() { + let mut io = Builder::new().little_endian().new_write(mock! { + Ok(b"\x09\x00\x00\x00"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_single_frame_with_short_length_field() { + let mut io = Builder::new().length_field_length(1).new_write(mock! { + Ok(b"\x09"[..].into()), + Ok(b"abcdefghi"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdefghi").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_max_frame_len() { + let mut io = Builder::new().max_frame_length(5).new_write(mock! {}); + + assert_eq!( + io.start_send("abcdef").unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_zero() { + let mut io = Builder::new().new_write(mock! {}); + + assert!(io.start_send("abcdef").unwrap().is_ready()); + assert_eq!( + io.poll_complete().unwrap_err().kind(), + io::ErrorKind::WriteZero + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_at_rest() { + let mut io = Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"abcdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdef").unwrap().is_ready()); + assert!(io.poll_complete().unwrap().is_ready()); + io.set_max_frame_length(5); + assert_eq!( + io.start_send("abcdef").unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +#[test] +fn write_update_max_frame_len_in_flight() { + let mut io = Builder::new().new_write(mock! { + Ok(b"\x00\x00\x00\x06"[..].into()), + Ok(b"ab"[..].into()), + Err(would_block()), + Ok(b"cdef"[..].into()), + Ok(Flush), + }); + + assert!(io.start_send("abcdef").unwrap().is_ready()); + assert!(!io.poll_complete().unwrap().is_ready()); + io.set_max_frame_length(5); + assert!(io.poll_complete().unwrap().is_ready()); + assert_eq!( + io.start_send("abcdef").unwrap_err().kind(), + io::ErrorKind::InvalidInput + ); + assert!(io.get_ref().calls.is_empty()); +} + +// ===== Test utils ===== + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +struct Mock { + calls: VecDeque<io::Result<Op>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl io::Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock {} + +impl io::Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(Op::Data(data))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ok(len) + } + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(0), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.calls.pop_front() { + Some(Ok(Op::Flush)) => Ok(()), + Some(Ok(_)) => panic!(), + Some(Err(e)) => Err(e), + None => Ok(()), + } + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Ready(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} |