summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-io/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/decoder.rs3
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/encoder.rs3
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/framed.rs283
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs216
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs246
-rw-r--r--third_party/rust/tokio-io/src/_tokio_codec/mod.rs36
-rw-r--r--third_party/rust/tokio-io/src/allow_std.rs93
-rw-r--r--third_party/rust/tokio-io/src/async_read.rs173
-rw-r--r--third_party/rust/tokio-io/src/async_write.rs222
-rw-r--r--third_party/rust/tokio-io/src/codec/bytes_codec.rs42
-rw-r--r--third_party/rust/tokio-io/src/codec/decoder.rs117
-rw-r--r--third_party/rust/tokio-io/src/codec/encoder.rs25
-rw-r--r--third_party/rust/tokio-io/src/codec/lines_codec.rs88
-rw-r--r--third_party/rust/tokio-io/src/codec/mod.rs378
-rw-r--r--third_party/rust/tokio-io/src/framed.rs248
-rw-r--r--third_party/rust/tokio-io/src/framed_read.rs220
-rw-r--r--third_party/rust/tokio-io/src/framed_write.rs250
-rw-r--r--third_party/rust/tokio-io/src/io/copy.rs100
-rw-r--r--third_party/rust/tokio-io/src/io/flush.rs43
-rw-r--r--third_party/rust/tokio-io/src/io/mod.rs32
-rw-r--r--third_party/rust/tokio-io/src/io/read.rs60
-rw-r--r--third_party/rust/tokio-io/src/io/read_exact.rs85
-rw-r--r--third_party/rust/tokio-io/src/io/read_to_end.rs66
-rw-r--r--third_party/rust/tokio-io/src/io/read_until.rs76
-rw-r--r--third_party/rust/tokio-io/src/io/shutdown.rs44
-rw-r--r--third_party/rust/tokio-io/src/io/write_all.rs88
-rw-r--r--third_party/rust/tokio-io/src/length_delimited.rs943
-rw-r--r--third_party/rust/tokio-io/src/lib.rs75
-rw-r--r--third_party/rust/tokio-io/src/lines.rs62
-rw-r--r--third_party/rust/tokio-io/src/split.rs247
-rw-r--r--third_party/rust/tokio-io/src/window.rs117
31 files changed, 4681 insertions, 0 deletions
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs
new file mode 100644
index 0000000000..edc0ecc0d4
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs
@@ -0,0 +1,3 @@
+// For now, we need to keep the implementation of Encoder in tokio_io.
+
+pub use codec::Decoder;
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs
new file mode 100644
index 0000000000..20b9e375e0
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs
@@ -0,0 +1,3 @@
+// For now, we need to keep the implementation of Encoder in tokio_io.
+
+pub use codec::Encoder;
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs
new file mode 100644
index 0000000000..d290575e79
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs
@@ -0,0 +1,283 @@
+#![allow(deprecated)]
+
+use std::fmt;
+use std::io::{self, Read, Write};
+
+use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
+use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
+use codec::{Decoder, Encoder};
+use {AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures::{Poll, Sink, StartSend, Stream};
+
+/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
+pub struct Framed<T, U> {
+ inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
+}
+
+pub struct Fuse<T, U>(pub T, pub U);
+
+impl<T, U> Framed<T, U>
+where
+ T: AsyncRead + AsyncWrite,
+ U: Decoder + Encoder,
+{
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn new(inner: T, codec: U) -> Framed<T, U> {
+ Framed {
+ inner: framed_read2(framed_write2(Fuse(inner, codec))),
+ }
+ }
+}
+
+impl<T, U> Framed<T, U> {
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// This objects takes a stream and a readbuffer and a writebuffer. These field
+ /// can be obtained from an existing `Framed` with the `into_parts` method.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
+ Framed {
+ inner: framed_read2_with_buffer(
+ framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
+ parts.read_buf,
+ ),
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.get_ref().get_ref().0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.get_mut().get_mut().0
+ }
+
+ /// Returns a reference to the underlying codec wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec(&self) -> &U {
+ &self.inner.get_ref().get_ref().1
+ }
+
+ /// Returns a mutable reference to the underlying codec wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec_mut(&mut self) -> &mut U {
+ &mut self.inner.get_mut().get_mut().1
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner().into_inner().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream, the buffer
+ /// with unprocessed data, and the codec.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_parts(self) -> FramedParts<T, U> {
+ let (inner, read_buf) = self.inner.into_parts();
+ let (inner, write_buf) = inner.into_parts();
+
+ FramedParts {
+ io: inner.0,
+ codec: inner.1,
+ read_buf: read_buf,
+ write_buf: write_buf,
+ _priv: (),
+ }
+ }
+}
+
+impl<T, U> Stream for Framed<T, U>
+where
+ T: AsyncRead,
+ U: Decoder,
+{
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T, U> Sink for Framed<T, U>
+where
+ T: AsyncWrite,
+ U: Encoder,
+ U::Error: From<io::Error>,
+{
+ type SinkItem = U::Item;
+ type SinkError = U::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.inner.get_mut().start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.get_mut().poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.get_mut().close()
+ }
+}
+
+impl<T, U> fmt::Debug for Framed<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Framed")
+ .field("io", &self.inner.get_ref().get_ref().0)
+ .field("codec", &self.inner.get_ref().get_ref().1)
+ .finish()
+ }
+}
+
+// ===== impl Fuse =====
+
+impl<T: Read, U> Read for Fuse<T, U> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.0.read(dst)
+ }
+}
+
+impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.0.prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T: Write, U> Write for Fuse<T, U> {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ self.0.write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+}
+
+impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ self.0.shutdown()
+ }
+}
+
+impl<T, U: Decoder> Decoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode(buffer)
+ }
+
+ fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode_eof(buffer)
+ }
+}
+
+impl<T, U: Encoder> Encoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ self.1.encode(item, dst)
+ }
+}
+
+/// `FramedParts` contains an export of the data of a Framed transport.
+/// It can be used to construct a new `Framed` with a different codec.
+/// It contains all current buffers and the inner transport.
+#[derive(Debug)]
+pub struct FramedParts<T, U> {
+ /// The inner transport used to read bytes to and write bytes to
+ pub io: T,
+
+ /// The codec
+ pub codec: U,
+
+ /// The buffer with read but unprocessed data.
+ pub read_buf: BytesMut,
+
+ /// A buffer with unprocessed data which are not written yet.
+ pub write_buf: BytesMut,
+
+ /// This private field allows us to add additional fields in the future in a
+ /// backwards compatible way.
+ _priv: (),
+}
+
+impl<T, U> FramedParts<T, U> {
+ /// Create a new, default, `FramedParts`
+ pub fn new(io: T, codec: U) -> FramedParts<T, U> {
+ FramedParts {
+ io,
+ codec,
+ read_buf: BytesMut::new(),
+ write_buf: BytesMut::new(),
+ _priv: (),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs
new file mode 100644
index 0000000000..ea7550877c
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs
@@ -0,0 +1,216 @@
+#![allow(deprecated)]
+
+use std::fmt;
+
+use super::framed::Fuse;
+use codec::Decoder;
+use AsyncRead;
+
+use bytes::BytesMut;
+use futures::{Async, Poll, Sink, StartSend, Stream};
+
+/// A `Stream` of messages decoded from an `AsyncRead`.
+pub struct FramedRead<T, D> {
+ inner: FramedRead2<Fuse<T, D>>,
+}
+
+pub struct FramedRead2<T> {
+ inner: T,
+ eof: bool,
+ is_readable: bool,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+// ===== impl FramedRead =====
+
+impl<T, D> FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ /// Creates a new `FramedRead` with the given `decoder`.
+ pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
+ FramedRead {
+ inner: framed_read2(Fuse(inner, decoder)),
+ }
+ }
+}
+
+impl<T, D> FramedRead<T, D> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedRead`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn decoder(&self) -> &D {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn decoder_mut(&mut self) -> &mut D {
+ &mut self.inner.inner.1
+ }
+}
+
+impl<T, D> Stream for FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ type Item = D::Item;
+ type Error = D::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T, D> Sink for FramedRead<T, D>
+where
+ T: Sink,
+{
+ type SinkItem = T::SinkItem;
+ type SinkError = T::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.inner.inner.0.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.inner.0.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.inner.0.close()
+ }
+}
+
+impl<T, D> fmt::Debug for FramedRead<T, D>
+where
+ T: fmt::Debug,
+ D: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FramedRead")
+ .field("inner", &self.inner.inner.0)
+ .field("decoder", &self.inner.inner.1)
+ .field("eof", &self.inner.eof)
+ .field("is_readable", &self.inner.is_readable)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedRead2 =====
+
+pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: buf.len() > 0,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedRead2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> Stream for FramedRead2<T>
+where
+ T: AsyncRead + Decoder,
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if self.is_readable {
+ if self.eof {
+ let frame = self.inner.decode_eof(&mut self.buffer)?;
+ return Ok(Async::Ready(frame));
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = self.inner.decode(&mut self.buffer)? {
+ trace!("frame decoded from buffer");
+ return Ok(Async::Ready(Some(frame)));
+ }
+
+ self.is_readable = false;
+ }
+
+ assert!(!self.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ self.buffer.reserve(1);
+ if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
+ self.eof = true;
+ }
+
+ self.is_readable = true;
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs
new file mode 100644
index 0000000000..7541b1730d
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs
@@ -0,0 +1,246 @@
+#![allow(deprecated)]
+
+use std::fmt;
+use std::io::{self, Read};
+
+use super::framed::Fuse;
+use codec::{Decoder, Encoder};
+use {AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
+
+/// A `Sink` of frames encoded to an `AsyncWrite`.
+pub struct FramedWrite<T, E> {
+ inner: FramedWrite2<Fuse<T, E>>,
+}
+
+pub struct FramedWrite2<T> {
+ inner: T,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+impl<T, E> FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder,
+{
+ /// Creates a new `FramedWrite` with the given `encoder`.
+ pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+ FramedWrite {
+ inner: framed_write2(Fuse(inner, encoder)),
+ }
+ }
+}
+
+impl<T, E> FramedWrite<T, E> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn encoder(&self) -> &E {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn encoder_mut(&mut self) -> &mut E {
+ &mut self.inner.inner.1
+ }
+}
+
+impl<T, E> Sink for FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder,
+{
+ type SinkItem = E::Item;
+ type SinkError = E::Error;
+
+ fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> {
+ self.inner.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ Ok(self.inner.close()?)
+ }
+}
+
+impl<T, D> Stream for FramedWrite<T, D>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.inner.0.poll()
+ }
+}
+
+impl<T, U> fmt::Debug for FramedWrite<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FramedWrite")
+ .field("inner", &self.inner.get_ref().0)
+ .field("encoder", &self.inner.get_ref().1)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedWrite2 =====
+
+pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
+ FramedWrite2 {
+ inner: inner,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedWrite2 {
+ inner: inner,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedWrite2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> Sink for FramedWrite2<T>
+where
+ T: AsyncWrite + Encoder,
+{
+ type SinkItem = T::Item;
+ type SinkError = T::Error;
+
+ fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
+ // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
+ // *still* over 8KiB, then apply backpressure (reject the send).
+ if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
+ self.poll_complete()?;
+
+ if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ }
+
+ self.inner.encode(item, &mut self.buffer)?;
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ trace!("flushing framed transport");
+
+ while !self.buffer.is_empty() {
+ trace!("writing; remaining={}", self.buffer.len());
+
+ let n = try_ready!(self.inner.poll_write(&self.buffer));
+
+ if n == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to \
+ write frame to transport",
+ )
+ .into());
+ }
+
+ // TODO: Add a way to `bytes` to do this w/o returning the drained
+ // data.
+ let _ = self.buffer.split_to(n);
+ }
+
+ // Try flushing the underlying IO
+ try_ready!(self.inner.poll_flush());
+
+ trace!("framed transport flushed");
+ return Ok(Async::Ready(()));
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ try_ready!(self.poll_complete());
+ Ok(self.inner.shutdown()?)
+ }
+}
+
+impl<T: Decoder> Decoder for FramedWrite2<T> {
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+ self.inner.decode(src)
+ }
+
+ fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+ self.inner.decode_eof(src)
+ }
+}
+
+impl<T: Read> Read for FramedWrite2<T> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.inner.read(dst)
+ }
+}
+
+impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+}
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/mod.rs b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs
new file mode 100644
index 0000000000..5d7eb21890
--- /dev/null
+++ b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs
@@ -0,0 +1,36 @@
+//! Utilities for encoding and decoding frames.
+//!
+//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as [transports].
+//!
+//! [`AsyncRead`]: #
+//! [`AsyncWrite`]: #
+//! [`Sink`]: #
+//! [`Stream`]: #
+//! [transports]: #
+
+#![deny(missing_docs, missing_debug_implementations)]
+#![doc(hidden, html_root_url = "https://docs.rs/tokio-codec/0.1.0")]
+
+// _tokio_codec are the items that belong in the `tokio_codec` crate. However, because we need to
+// maintain backward compatibility until the next major breaking change, they are defined here.
+// When the next breaking change comes, they should be moved to the `tokio_codec` crate and become
+// independent.
+//
+// The primary reason we can't move these to `tokio-codec` now is because, again for backward
+// compatibility reasons, we need to keep `Decoder` and `Encoder` in tokio_io::codec. And `Decoder`
+// and `Encoder` needs to reference `Framed`. So they all still need to still be in the same
+// module.
+
+mod decoder;
+mod encoder;
+mod framed;
+mod framed_read;
+mod framed_write;
+
+pub use self::decoder::Decoder;
+pub use self::encoder::Encoder;
+pub use self::framed::{Framed, FramedParts};
+pub use self::framed_read::FramedRead;
+pub use self::framed_write::FramedWrite;
diff --git a/third_party/rust/tokio-io/src/allow_std.rs b/third_party/rust/tokio-io/src/allow_std.rs
new file mode 100644
index 0000000000..af39ac2197
--- /dev/null
+++ b/third_party/rust/tokio-io/src/allow_std.rs
@@ -0,0 +1,93 @@
+use futures::{Async, Poll};
+use std::{fmt, io};
+use {AsyncRead, AsyncWrite};
+
+/// A simple wrapper type which allows types that only implement
+/// `std::io::Read` or `std::io::Write` to be used in contexts which expect
+/// an `AsyncRead` or `AsyncWrite`.
+///
+/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`,
+/// it is expected that they will notify the current task on readiness.
+/// Synchronous `std` types should not issue errors of this kind and
+/// are safe to use in this context. However, using these types with
+/// `AllowStdIo` will cause the event loop to block, so they should be used
+/// with care.
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AllowStdIo<T>(T);
+
+impl<T> AllowStdIo<T> {
+ /// Creates a new `AllowStdIo` from an existing IO object.
+ pub fn new(io: T) -> Self {
+ AllowStdIo(io)
+ }
+
+ /// Returns a reference to the contained IO object.
+ pub fn get_ref(&self) -> &T {
+ &self.0
+ }
+
+ /// Returns a mutable reference to the contained IO object.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.0
+ }
+
+ /// Consumes self and returns the contained IO object.
+ pub fn into_inner(self) -> T {
+ self.0
+ }
+}
+
+impl<T> io::Write for AllowStdIo<T>
+where
+ T: io::Write,
+{
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.0.write(buf)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ self.0.write_all(buf)
+ }
+ fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> {
+ self.0.write_fmt(fmt)
+ }
+}
+
+impl<T> AsyncWrite for AllowStdIo<T>
+where
+ T: io::Write,
+{
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(Async::Ready(()))
+ }
+}
+
+impl<T> io::Read for AllowStdIo<T>
+where
+ T: io::Read,
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.0.read(buf)
+ }
+ // TODO: implement the `initializer` fn when it stabilizes.
+ // See rust-lang/rust #42788
+ fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
+ self.0.read_to_end(buf)
+ }
+ fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> {
+ self.0.read_to_string(buf)
+ }
+ fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+ self.0.read_exact(buf)
+ }
+}
+
+impl<T> AsyncRead for AllowStdIo<T>
+where
+ T: io::Read,
+{
+ // TODO: override prepare_uninitialized_buffer once `Read::initializer` is stable.
+ // See rust-lang/rust #42788
+}
diff --git a/third_party/rust/tokio-io/src/async_read.rs b/third_party/rust/tokio-io/src/async_read.rs
new file mode 100644
index 0000000000..d82aa71160
--- /dev/null
+++ b/third_party/rust/tokio-io/src/async_read.rs
@@ -0,0 +1,173 @@
+use bytes::BufMut;
+use futures::{Async, Poll};
+use std::io as std_io;
+
+#[allow(deprecated)]
+use codec::{Decoder, Encoder, Framed};
+use split::{ReadHalf, WriteHalf};
+use {framed, split, AsyncWrite};
+
+/// Read bytes asynchronously.
+///
+/// This trait inherits from `std::io::Read` and indicates that an I/O object is
+/// **non-blocking**. All non-blocking I/O objects must return an error when
+/// bytes are unavailable instead of blocking the current thread.
+///
+/// Specifically, this means that the `poll_read` function will return one of
+/// the following:
+///
+/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately read
+/// and placed into the output buffer, where `n` == 0 implies that EOF has
+/// been reached.
+///
+/// * `Ok(Async::NotReady)` means that no data was read into the buffer
+/// provided. The I/O object is not currently readable but may become readable
+/// in the future. Most importantly, **the current future's task is scheduled
+/// to get unparked when the object is readable**. This means that like
+/// `Future::poll` you'll receive a notification when the I/O object is
+/// readable again.
+///
+/// * `Err(e)` for other errors are standard I/O errors coming from the
+/// underlying object.
+///
+/// This trait importantly means that the `read` method only works in the
+/// context of a future's task. The object may panic if used outside of a task.
+pub trait AsyncRead: std_io::Read {
+ /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns
+ /// `true` if the supplied buffer was zeroed out.
+ ///
+ /// While it would be highly unusual, implementations of [`io::Read`] are
+ /// able to read data from the buffer passed as an argument. Because of
+ /// this, the buffer passed to [`io::Read`] must be initialized memory. In
+ /// situations where large numbers of buffers are used, constantly having to
+ /// zero out buffers can be expensive.
+ ///
+ /// This function does any necessary work to prepare an uninitialized buffer
+ /// to be safe to pass to `read`. If `read` guarantees to never attempt to
+ /// read data out of the supplied buffer, then `prepare_uninitialized_buffer`
+ /// doesn't need to do any work.
+ ///
+ /// If this function returns `true`, then the memory has been zeroed out.
+ /// This allows implementations of `AsyncRead` which are composed of
+ /// multiple subimplementations to efficiently implement
+ /// `prepare_uninitialized_buffer`.
+ ///
+ /// This function isn't actually `unsafe` to call but `unsafe` to implement.
+ /// The implementer must ensure that either the whole `buf` has been zeroed
+ /// or `read_buf()` overwrites the buffer without reading it and returns
+ /// correct value.
+ ///
+ /// This function is called from [`read_buf`].
+ ///
+ /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
+ /// [`read_buf`]: #method.read_buf
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ for i in 0..buf.len() {
+ buf[i] = 0;
+ }
+
+ true
+ }
+
+ /// Attempt to read from the `AsyncRead` into `buf`.
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_read))`.
+ ///
+ /// If no data is available for reading, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object becomes
+ /// readable or is closed.
+ fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> {
+ match self.read(buf) {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ /// Pull some bytes from this source into the specified `BufMut`, returning
+ /// how many bytes were read.
+ ///
+ /// The `buf` provided will have bytes read into it and the internal cursor
+ /// will be advanced if any bytes were read. Note that this method typically
+ /// will not reallocate the buffer provided.
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
+ where
+ Self: Sized,
+ {
+ if !buf.has_remaining_mut() {
+ return Ok(Async::Ready(0));
+ }
+
+ unsafe {
+ let n = {
+ let b = buf.bytes_mut();
+
+ self.prepare_uninitialized_buffer(b);
+
+ try_ready!(self.poll_read(b))
+ };
+
+ buf.advance_mut(n);
+ Ok(Async::Ready(n))
+ }
+ }
+
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// I/O object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ #[deprecated(since = "0.1.7", note = "Use tokio_codec::Decoder::framed instead")]
+ #[allow(deprecated)]
+ fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T>
+ where
+ Self: AsyncWrite + Sized,
+ {
+ framed::framed(self, codec)
+ }
+
+ /// Helper method for splitting this read/write object into two halves.
+ ///
+ /// The two halves returned implement the `Read` and `Write` traits,
+ /// respectively.
+ ///
+ /// To restore this read/write object from its `ReadHalf` and `WriteHalf`
+ /// use `unsplit`.
+ fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
+ where
+ Self: AsyncWrite + Sized,
+ {
+ split::split(self)
+ }
+}
+
+impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ (**self).prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ (**self).prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<'a> AsyncRead for &'a [u8] {
+ unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool {
+ false
+ }
+}
diff --git a/third_party/rust/tokio-io/src/async_write.rs b/third_party/rust/tokio-io/src/async_write.rs
new file mode 100644
index 0000000000..0a09480e82
--- /dev/null
+++ b/third_party/rust/tokio-io/src/async_write.rs
@@ -0,0 +1,222 @@
+use bytes::Buf;
+use futures::{Async, Poll};
+use std::io as std_io;
+
+use AsyncRead;
+
+/// Writes bytes asynchronously.
+///
+/// The trait inherits from `std::io::Write` and indicates that an I/O object is
+/// **nonblocking**. All non-blocking I/O objects must return an error when
+/// bytes cannot be written instead of blocking the current thread.
+///
+/// Specifically, this means that the `poll_write` function will return one of
+/// the following:
+///
+/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately
+/// written.
+///
+/// * `Ok(Async::NotReady)` means that no data was written from the buffer
+/// provided. The I/O object is not currently writable but may become writable
+/// in the future. Most importantly, **the current future's task is scheduled
+/// to get unparked when the object is writable**. This means that like
+/// `Future::poll` you'll receive a notification when the I/O object is
+/// writable again.
+///
+/// * `Err(e)` for other errors are standard I/O errors coming from the
+/// underlying object.
+///
+/// This trait importantly means that the `write` method only works in the
+/// context of a future's task. The object may panic if used outside of a task.
+///
+/// Note that this trait also represents that the `Write::flush` method works
+/// very similarly to the `write` method, notably that `Ok(())` means that the
+/// writer has successfully been flushed, a "would block" error means that the
+/// current task is ready to receive a notification when flushing can make more
+/// progress, and otherwise normal errors can happen as well.
+pub trait AsyncWrite: std_io::Write {
+ /// Attempt to write bytes from `buf` into the object.
+ ///
+ /// On success, returns `Ok(Async::Ready(num_bytes_written))`.
+ ///
+ /// If the object is not ready for writing, the method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object becomes
+ /// readable or is closed.
+ fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> {
+ match self.write(buf) {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ /// Attempt to flush the object, ensuring that any buffered data reach
+ /// their destination.
+ ///
+ /// On success, returns `Ok(Async::Ready(()))`.
+ ///
+ /// If flushing cannot immediately complete, this method returns
+ /// `Ok(Async::NotReady)` and arranges for the current task (via
+ /// `cx.waker()`) to receive a notification when the object can make
+ /// progress towards flushing.
+ fn poll_flush(&mut self) -> Poll<(), std_io::Error> {
+ match self.flush() {
+ Ok(t) => Ok(Async::Ready(t)),
+ Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ /// Initiates or attempts to shut down this writer, returning success when
+ /// the I/O connection has completely shut down.
+ ///
+ /// This method is intended to be used for asynchronous shutdown of I/O
+ /// connections. For example this is suitable for implementing shutdown of a
+ /// TLS connection or calling `TcpStream::shutdown` on a proxied connection.
+ /// Protocols sometimes need to flush out final pieces of data or otherwise
+ /// perform a graceful shutdown handshake, reading/writing more data as
+ /// appropriate. This method is the hook for such protocols to implement the
+ /// graceful shutdown logic.
+ ///
+ /// This `shutdown` method is required by implementers of the
+ /// `AsyncWrite` trait. Wrappers typically just want to proxy this call
+ /// through to the wrapped type, and base types will typically implement
+ /// shutdown logic here or just return `Ok(().into())`. Note that if you're
+ /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that
+ /// transitively the entire stream has been shut down. After your wrapper's
+ /// shutdown logic has been executed you should shut down the underlying
+ /// stream.
+ ///
+ /// Invocation of a `shutdown` implies an invocation of `flush`. Once this
+ /// method returns `Ready` it implies that a flush successfully happened
+ /// before the shutdown happened. That is, callers don't need to call
+ /// `flush` before calling `shutdown`. They can rely that by calling
+ /// `shutdown` any pending buffered data will be written out.
+ ///
+ /// # Return value
+ ///
+ /// This function returns a `Poll<(), io::Error>` classified as such:
+ ///
+ /// * `Ok(Async::Ready(()))` - indicates that the connection was
+ /// successfully shut down and is now safe to deallocate/drop/close
+ /// resources associated with it. This method means that the current task
+ /// will no longer receive any notifications due to this method and the
+ /// I/O object itself is likely no longer usable.
+ ///
+ /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could
+ /// not complete just yet. This may mean that more I/O needs to happen to
+ /// continue this shutdown operation. The current task is scheduled to
+ /// receive a notification when it's otherwise ready to continue the
+ /// shutdown operation. When woken up this method should be called again.
+ ///
+ /// * `Err(e)` - indicates a fatal error has happened with shutdown,
+ /// indicating that the shutdown operation did not complete successfully.
+ /// This typically means that the I/O object is no longer usable.
+ ///
+ /// # Errors
+ ///
+ /// This function can return normal I/O errors through `Err`, described
+ /// above. Additionally this method may also render the underlying
+ /// `Write::write` method no longer usable (e.g. will return errors in the
+ /// future). It's recommended that once `shutdown` is called the
+ /// `write` method is no longer called.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if not called within the context of a future's
+ /// task.
+ fn shutdown(&mut self) -> Poll<(), std_io::Error>;
+
+ /// Write a `Buf` into this value, returning how many bytes were written.
+ ///
+ /// Note that this method will advance the `buf` provided automatically by
+ /// the number of bytes written.
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error>
+ where
+ Self: Sized,
+ {
+ if !buf.has_remaining() {
+ return Ok(Async::Ready(0));
+ }
+
+ let n = try_ready!(self.poll_write(buf.bytes()));
+ buf.advance(n);
+ Ok(Async::Ready(n))
+ }
+}
+
+impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ (**self).shutdown()
+ }
+}
+impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ (**self).shutdown()
+ }
+}
+
+impl AsyncRead for std_io::Repeat {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+}
+
+impl AsyncWrite for std_io::Sink {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ Ok(().into())
+ }
+}
+
+impl<T: AsyncRead> AsyncRead for std_io::Take<T> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.get_ref().prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T, U> AsyncRead for std_io::Chain<T, U>
+where
+ T: AsyncRead,
+ U: AsyncRead,
+{
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ let (t, u) = self.get_ref();
+ // We don't need to execute the second initializer if the first one
+ // already zeroed the buffer out.
+ t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ try_ready!(self.poll_flush());
+ self.get_mut().shutdown()
+ }
+}
+
+impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.get_ref().prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {}
+
+impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ Ok(().into())
+ }
+}
+
+impl AsyncWrite for std_io::Cursor<Vec<u8>> {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ Ok(().into())
+ }
+}
+
+impl AsyncWrite for std_io::Cursor<Box<[u8]>> {
+ fn shutdown(&mut self) -> Poll<(), std_io::Error> {
+ Ok(().into())
+ }
+}
diff --git a/third_party/rust/tokio-io/src/codec/bytes_codec.rs b/third_party/rust/tokio-io/src/codec/bytes_codec.rs
new file mode 100644
index 0000000000..ecfd15ab99
--- /dev/null
+++ b/third_party/rust/tokio-io/src/codec/bytes_codec.rs
@@ -0,0 +1,42 @@
+#![allow(deprecated)]
+
+use bytes::{BufMut, Bytes, BytesMut};
+use codec::{Decoder, Encoder};
+use std::io;
+
+/// A simple `Codec` implementation that just ships bytes around.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+ /// Creates a new `BytesCodec` for shipping around raw bytes.
+ pub fn new() -> BytesCodec {
+ BytesCodec(())
+ }
+}
+
+impl Decoder for BytesCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ if buf.len() > 0 {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl Encoder for BytesCodec {
+ type Item = Bytes;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
diff --git a/third_party/rust/tokio-io/src/codec/decoder.rs b/third_party/rust/tokio-io/src/codec/decoder.rs
new file mode 100644
index 0000000000..cd5777dea8
--- /dev/null
+++ b/third_party/rust/tokio-io/src/codec/decoder.rs
@@ -0,0 +1,117 @@
+use bytes::BytesMut;
+use std::io;
+
+use super::encoder::Encoder;
+use {AsyncRead, AsyncWrite};
+
+use _tokio_codec::Framed;
+
+/// Decoding of frames via buffers.
+///
+/// This trait is used when constructing an instance of `Framed` or
+/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has
+/// already been buffered in `src` and decodes the data into a stream of
+/// `Self::Item` frames.
+///
+/// Implementations are able to track state on `self`, which enables
+/// implementing stateful streaming parsers. In many cases, though, this type
+/// will simply be a unit struct (e.g. `struct HttpDecoder`).
+
+// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and
+// there doesn't seem to be a way to un-deprecate the re-export.
+pub trait Decoder {
+ /// The type of decoded frames.
+ type Item;
+
+ /// The type of unrecoverable frame decoding errors.
+ ///
+ /// If an individual message is ill-formed but can be ignored without
+ /// interfering with the processing of future messages, it may be more
+ /// useful to report the failure as an `Item`.
+ ///
+ /// `From<io::Error>` is required in the interest of making `Error` suitable
+ /// for returning directly from a `FramedRead`, and to enable the default
+ /// implementation of `decode_eof` to yield an `io::Error` when the decoder
+ /// fails to consume all available data.
+ ///
+ /// Note that implementors of this trait can simply indicate `type Error =
+ /// io::Error` to use I/O errors as this type.
+ type Error: From<io::Error>;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ ///
+ /// This method is called by `FramedRead` whenever bytes are ready to be
+ /// parsed. The provided buffer of bytes is what's been read so far, and
+ /// this instance of `Decode` can determine whether an entire frame is in
+ /// the buffer and is ready to be returned.
+ ///
+ /// If an entire frame is available, then this instance will remove those
+ /// bytes from the buffer provided and return them as a decoded
+ /// frame. Note that removing bytes from the provided buffer doesn't always
+ /// necessarily copy the bytes, so this should be an efficient operation in
+ /// most circumstances.
+ ///
+ /// If the bytes look valid, but a frame isn't fully available yet, then
+ /// `Ok(None)` is returned. This indicates to the `Framed` instance that
+ /// it needs to read some more bytes before calling this method again.
+ ///
+ /// Note that the bytes provided may be empty. If a previous call to
+ /// `decode` consumed all the bytes in the buffer then `decode` will be
+ /// called again until it returns `Ok(None)`, indicating that more bytes need to
+ /// be read.
+ ///
+ /// Finally, if the bytes in the buffer are malformed then an error is
+ /// returned indicating why. This informs `Framed` that the stream is now
+ /// corrupt and should be terminated.
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
+
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the underlying I/O.
+ ///
+ /// This method defaults to calling `decode` and returns an error if
+ /// `Ok(None)` is returned while there is unconsumed data in `buf`.
+ /// Typically this doesn't need to be implemented unless the framing
+ /// protocol differs near the end of the stream.
+ ///
+ /// Note that the `buf` argument may be empty. If a previous call to
+ /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
+ /// called again until it returns `None`, indicating that there are no more
+ /// frames to yield. This behavior enables returning finalization frames
+ /// that may not be based on inbound data.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ match self.decode(buf)? {
+ Some(frame) => Ok(Some(frame)),
+ None => {
+ if buf.is_empty() {
+ Ok(None)
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
+ }
+ }
+ }
+ }
+
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
+ where
+ Self: Encoder + Sized,
+ {
+ Framed::new(io, self)
+ }
+}
diff --git a/third_party/rust/tokio-io/src/codec/encoder.rs b/third_party/rust/tokio-io/src/codec/encoder.rs
new file mode 100644
index 0000000000..5065080324
--- /dev/null
+++ b/third_party/rust/tokio-io/src/codec/encoder.rs
@@ -0,0 +1,25 @@
+use bytes::BytesMut;
+use std::io;
+
+/// Trait of helper objects to write out messages as bytes, for use with
+/// `FramedWrite`.
+
+// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and
+// there doesn't seem to be a way to un-deprecate the re-export.
+pub trait Encoder {
+ /// The type of items consumed by the `Encoder`
+ type Item;
+
+ /// The type of encoding errors.
+ ///
+ /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>`
+ /// in the interest letting it return `Error`s directly.
+ type Error: From<io::Error>;
+
+ /// Encodes a frame into the buffer provided.
+ ///
+ /// This method will encode `item` into the byte buffer provided by `dst`.
+ /// The `dst` provided is an internal buffer of the `Framed` instance and
+ /// will be written out when possible.
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
+}
diff --git a/third_party/rust/tokio-io/src/codec/lines_codec.rs b/third_party/rust/tokio-io/src/codec/lines_codec.rs
new file mode 100644
index 0000000000..818397fa50
--- /dev/null
+++ b/third_party/rust/tokio-io/src/codec/lines_codec.rs
@@ -0,0 +1,88 @@
+#![allow(deprecated)]
+
+use bytes::{BufMut, BytesMut};
+use codec::{Decoder, Encoder};
+use std::{io, str};
+
+/// A simple `Codec` implementation that splits up data into lines.
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+pub struct LinesCodec {
+ // Stored index of the next index to examine for a `\n` character.
+ // This is used to optimize searching.
+ // For example, if `decode` was called with `abc`, it would hold `3`,
+ // because that is the next index to examine.
+ // The next time `decode` is called with `abcde\n`, the method will
+ // only look at `de\n` before returning.
+ next_index: usize,
+}
+
+impl LinesCodec {
+ /// Returns a `LinesCodec` for splitting up data into lines.
+ pub fn new() -> LinesCodec {
+ LinesCodec { next_index: 0 }
+ }
+}
+
+fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
+ str::from_utf8(buf)
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
+}
+
+fn without_carriage_return(s: &[u8]) -> &[u8] {
+ if let Some(&b'\r') = s.last() {
+ &s[..s.len() - 1]
+ } else {
+ s
+ }
+}
+
+impl Decoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n') {
+ let newline_index = newline_offset + self.next_index;
+ let line = buf.split_to(newline_index + 1);
+ let line = &line[..line.len() - 1];
+ let line = without_carriage_return(line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Ok(Some(line.to_string()))
+ } else {
+ self.next_index = buf.len();
+ Ok(None)
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
+ Ok(match self.decode(buf)? {
+ Some(frame) => Some(frame),
+ None => {
+ // No terminating newline - return remaining data, if any
+ if buf.is_empty() || buf == &b"\r"[..] {
+ None
+ } else {
+ let line = buf.take();
+ let line = without_carriage_return(&line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Some(line.to_string())
+ }
+ }
+ })
+ }
+}
+
+impl Encoder for LinesCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(line.len() + 1);
+ buf.put(line);
+ buf.put_u8(b'\n');
+ Ok(())
+ }
+}
diff --git a/third_party/rust/tokio-io/src/codec/mod.rs b/third_party/rust/tokio-io/src/codec/mod.rs
new file mode 100644
index 0000000000..6636821750
--- /dev/null
+++ b/third_party/rust/tokio-io/src/codec/mod.rs
@@ -0,0 +1,378 @@
+//! Utilities for encoding and decoding frames.
+//!
+//! Contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as [transports].
+//!
+//! [`AsyncRead`]: #
+//! [`AsyncWrite`]: #
+//! [`Sink`]: #
+//! [`Stream`]: #
+//! [transports]: #
+
+// tokio_io::codec originally held all codec-related helpers. This is now intended to be in
+// tokio_codec instead. However, for backward compatibility, this remains here. When the next major
+// breaking change comes, `Encoder` and `Decoder` need to be moved to `tokio_codec`, and the rest
+// of this module should be removed.
+
+#![doc(hidden)]
+#![allow(deprecated)]
+
+mod bytes_codec;
+mod decoder;
+mod encoder;
+mod lines_codec;
+
+pub use self::bytes_codec::BytesCodec;
+pub use self::decoder::Decoder;
+pub use self::encoder::Encoder;
+pub use self::lines_codec::LinesCodec;
+
+pub use framed::{Framed, FramedParts};
+pub use framed_read::FramedRead;
+pub use framed_write::FramedWrite;
+
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub mod length_delimited {
+ //! Frame a stream of bytes based on a length prefix
+ //!
+ //! Many protocols delimit their frames by prefacing frame data with a
+ //! frame head that specifies the length of the frame. The
+ //! `length_delimited` module provides utilities for handling the length
+ //! based framing. This allows the consumer to work with entire frames
+ //! without having to worry about buffering or other framing logic.
+ //!
+ //! # Getting started
+ //!
+ //! If implementing a protocol from scratch, using length delimited framing
+ //! is an easy way to get started. [`Framed::new()`](length_delimited::Framed::new) will adapt a
+ //! full-duplex byte stream with a length delimited framer using default
+ //! configuration values.
+ //!
+ //! ```
+ //! use tokio_io::{AsyncRead, AsyncWrite};
+ //! use tokio_io::codec::length_delimited;
+ //!
+ //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
+ //! -> length_delimited::Framed<T>
+ //! {
+ //! length_delimited::Framed::new(io)
+ //! }
+ //! ```
+ //!
+ //! The returned transport implements `Sink + Stream` for `BytesMut`. It
+ //! encodes the frame with a big-endian `u32` header denoting the frame
+ //! payload length:
+ //!
+ //! ```text
+ //! +----------+--------------------------------+
+ //! | len: u32 | frame payload |
+ //! +----------+--------------------------------+
+ //! ```
+ //!
+ //! Specifically, given the following:
+ //!
+ //! ```
+ //! # extern crate tokio_io;
+ //! # extern crate bytes;
+ //! # extern crate futures;
+ //! #
+ //! use tokio_io::{AsyncRead, AsyncWrite};
+ //! use tokio_io::codec::length_delimited;
+ //! use bytes::BytesMut;
+ //! use futures::{Sink, Future};
+ //!
+ //! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+ //! let mut transport = length_delimited::Framed::new(io);
+ //! let frame = BytesMut::from("hello world");
+ //!
+ //! transport.send(frame).wait().unwrap();
+ //! }
+ //! #
+ //! # pub fn main() {}
+ //! ```
+ //!
+ //! The encoded frame will look like this:
+ //!
+ //! ```text
+ //! +---- len: u32 ----+---- data ----+
+ //! | \x00\x00\x00\x0b | hello world |
+ //! +------------------+--------------+
+ //! ```
+ //!
+ //! # Decoding
+ //!
+ //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
+ //! such that each yielded [`BytesMut`] value contains the contents of an
+ //! entire frame. There are many configuration parameters enabling
+ //! [`FramedRead`] to handle a wide range of protocols. Here are some
+ //! examples that will cover the various options at a high level.
+ //!
+ //! ## Example 1
+ //!
+ //! The following will parse a `u16` length field at offset 0, including the
+ //! frame head in the yielded `BytesMut`.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(0) // default value
+ //! .length_field_length(2)
+ //! .length_adjustment(0) // default value
+ //! .num_skip(0) // Do not strip frame header
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT DECODED
+ //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
+ //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world |
+ //! +----------+---------------+ +----------+---------------+
+ //! ```
+ //!
+ //! The value of the length field is 11 (`\x0B`) which represents the length
+ //! of the payload, `hello world`. By default, [`FramedRead`] assumes that
+ //! the length field represents the number of bytes that **follows** the
+ //! length field. Thus, the entire frame has a length of 13: 2 bytes for the
+ //! frame head + 11 bytes for the payload.
+ //!
+ //! ## Example 2
+ //!
+ //! The following will parse a `u16` length field at offset 0, omitting the
+ //! frame head in the yielded `BytesMut`.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(0) // default value
+ //! .length_field_length(2)
+ //! .length_adjustment(0) // default value
+ //! // `num_skip` is not needed, the default is to skip
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT DECODED
+ //! +-- len ---+--- Payload ---+ +--- Payload ---+
+ //! | \x00\x0B | Hello world | --> | Hello world |
+ //! +----------+---------------+ +---------------+
+ //! ```
+ //!
+ //! This is similar to the first example, the only difference is that the
+ //! frame head is **not** included in the yielded `BytesMut` value.
+ //!
+ //! ## Example 3
+ //!
+ //! The following will parse a `u16` length field at offset 0, including the
+ //! frame head in the yielded `BytesMut`. In this case, the length field
+ //! **includes** the frame head length.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(0) // default value
+ //! .length_field_length(2)
+ //! .length_adjustment(-2) // size of head
+ //! .num_skip(0)
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT DECODED
+ //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
+ //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world |
+ //! +----------+---------------+ +----------+---------------+
+ //! ```
+ //!
+ //! In most cases, the length field represents the length of the payload
+ //! only, as shown in the previous examples. However, in some protocols the
+ //! length field represents the length of the whole frame, including the
+ //! head. In such cases, we specify a negative `length_adjustment` to adjust
+ //! the value provided in the frame head to represent the payload length.
+ //!
+ //! ## Example 4
+ //!
+ //! The following will parse a 3 byte length field at offset 0 in a 5 byte
+ //! frame head, including the frame head in the yielded `BytesMut`.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(0) // default value
+ //! .length_field_length(3)
+ //! .length_adjustment(2) // remaining head
+ //! .num_skip(0)
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT
+ //! +---- len -----+- head -+--- Payload ---+
+ //! | \x00\x00\x0B | \xCAFE | Hello world |
+ //! +--------------+--------+---------------+
+ //!
+ //! DECODED
+ //! +---- len -----+- head -+--- Payload ---+
+ //! | \x00\x00\x0B | \xCAFE | Hello world |
+ //! +--------------+--------+---------------+
+ //! ```
+ //!
+ //! A more advanced example that shows a case where there is extra frame
+ //! head data between the length field and the payload. In such cases, it is
+ //! usually desirable to include the frame head as part of the yielded
+ //! `BytesMut`. This lets consumers of the length delimited framer to
+ //! process the frame head as needed.
+ //!
+ //! The positive `length_adjustment` value lets `FramedRead` factor in the
+ //! additional head into the frame length calculation.
+ //!
+ //! ## Example 5
+ //!
+ //! The following will parse a `u16` length field at offset 1 of a 4 byte
+ //! frame head. The first byte and the length field will be omitted from the
+ //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+ //! included.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(1) // length of hdr1
+ //! .length_field_length(2)
+ //! .length_adjustment(1) // length of hdr2
+ //! .num_skip(3) // length of hdr1 + LEN
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT
+ //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+ //! | \xCA | \x00\x0B | \xFE | Hello world |
+ //! +--------+----------+--------+---------------+
+ //!
+ //! DECODED
+ //! +- hdr2 -+--- Payload ---+
+ //! | \xFE | Hello world |
+ //! +--------+---------------+
+ //! ```
+ //!
+ //! The length field is situated in the middle of the frame head. In this
+ //! case, the first byte in the frame head could be a version or some other
+ //! identifier that is not needed for processing. On the other hand, the
+ //! second half of the head is needed.
+ //!
+ //! `length_field_offset` indicates how many bytes to skip before starting
+ //! to read the length field. `length_adjustment` is the number of bytes to
+ //! skip starting at the end of the length field. In this case, it is the
+ //! second half of the head.
+ //!
+ //! ## Example 6
+ //!
+ //! The following will parse a `u16` length field at offset 1 of a 4 byte
+ //! frame head. The first byte and the length field will be omitted from the
+ //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+ //! included. In this case, the length field **includes** the frame head
+ //! length.
+ //!
+ //! ```
+ //! # use tokio_io::AsyncRead;
+ //! # use tokio_io::codec::length_delimited;
+ //! # fn bind_read<T: AsyncRead>(io: T) {
+ //! length_delimited::Builder::new()
+ //! .length_field_offset(1) // length of hdr1
+ //! .length_field_length(2)
+ //! .length_adjustment(-3) // length of hdr1 + LEN, negative
+ //! .num_skip(3)
+ //! .new_read(io);
+ //! # }
+ //! ```
+ //!
+ //! The following frame will be decoded as such:
+ //!
+ //! ```text
+ //! INPUT
+ //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+ //! | \xCA | \x00\x0F | \xFE | Hello world |
+ //! +--------+----------+--------+---------------+
+ //!
+ //! DECODED
+ //! +- hdr2 -+--- Payload ---+
+ //! | \xFE | Hello world |
+ //! +--------+---------------+
+ //! ```
+ //!
+ //! Similar to the example above, the difference is that the length field
+ //! represents the length of the entire frame instead of just the payload.
+ //! The length of `hdr1` and `len` must be counted in `length_adjustment`.
+ //! Note that the length of `hdr2` does **not** need to be explicitly set
+ //! anywhere because it already is factored into the total frame length that
+ //! is read from the byte stream.
+ //!
+ //! # Encoding
+ //!
+ //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
+ //! such that each submitted [`BytesMut`] is prefaced by a length field.
+ //! There are fewer configuration options than [`FramedRead`]. Given
+ //! protocols that have more complex frame heads, an encoder should probably
+ //! be written by hand using [`Encoder`].
+ //!
+ //! Here is a simple example, given a `FramedWrite` with the following
+ //! configuration:
+ //!
+ //! ```
+ //! # extern crate tokio_io;
+ //! # extern crate bytes;
+ //! # use tokio_io::AsyncWrite;
+ //! # use tokio_io::codec::length_delimited;
+ //! # use bytes::BytesMut;
+ //! # fn write_frame<T: AsyncWrite>(io: T) {
+ //! # let _: length_delimited::FramedWrite<T, BytesMut> =
+ //! length_delimited::Builder::new()
+ //! .length_field_length(2)
+ //! .new_write(io);
+ //! # }
+ //! # pub fn main() {}
+ //! ```
+ //!
+ //! A payload of `hello world` will be encoded as:
+ //!
+ //! ```text
+ //! +- len: u16 -+---- data ----+
+ //! | \x00\x0b | hello world |
+ //! +------------+--------------+
+ //! ```
+ //!
+ //! [`FramedRead`]: struct.FramedRead.html
+ //! [`FramedWrite`]: struct.FramedWrite.html
+ //! [`AsyncRead`]: ../../trait.AsyncRead.html
+ //! [`AsyncWrite`]: ../../trait.AsyncWrite.html
+ //! [`Encoder`]: ../trait.Encoder.html
+ //! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html
+
+ pub use length_delimited::*;
+}
diff --git a/third_party/rust/tokio-io/src/framed.rs b/third_party/rust/tokio-io/src/framed.rs
new file mode 100644
index 0000000000..aea7b5468c
--- /dev/null
+++ b/third_party/rust/tokio-io/src/framed.rs
@@ -0,0 +1,248 @@
+#![allow(deprecated)]
+
+use std::fmt;
+use std::io::{self, Read, Write};
+
+use codec::{Decoder, Encoder};
+use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
+use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
+use {AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures::{Poll, Sink, StartSend, Stream};
+
+/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct Framed<T, U> {
+ inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
+}
+
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct Fuse<T, U>(pub T, pub U);
+
+pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U>
+where
+ T: AsyncRead + AsyncWrite,
+ U: Decoder + Encoder,
+{
+ Framed {
+ inner: framed_read2(framed_write2(Fuse(inner, codec))),
+ }
+}
+
+impl<T, U> Framed<T, U> {
+ /// Provides a `Stream` and `Sink` interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// This objects takes a stream and a readbuffer and a writebuffer. These field
+ /// can be obtained from an existing `Framed` with the `into_parts` method.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U> {
+ Framed {
+ inner: framed_read2_with_buffer(
+ framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf),
+ parts.readbuf,
+ ),
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.get_ref().get_ref().0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Frame`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.get_mut().get_mut().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner().into_inner().0
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+ /// with unprocessed data.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_parts(self) -> FramedParts<T> {
+ let (inner, readbuf) = self.inner.into_parts();
+ let (inner, writebuf) = inner.into_parts();
+ FramedParts {
+ inner: inner.0,
+ readbuf: readbuf,
+ writebuf: writebuf,
+ }
+ }
+
+ /// Consumes the `Frame`, returning its underlying I/O stream and the buffer
+ /// with unprocessed data, and also the current codec state.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ ///
+ /// Note that this function will be removed once the codec has been
+ /// integrated into `FramedParts` in a new version (see
+ /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)).
+ pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) {
+ let (inner, readbuf) = self.inner.into_parts();
+ let (inner, writebuf) = inner.into_parts();
+ (
+ FramedParts {
+ inner: inner.0,
+ readbuf: readbuf,
+ writebuf: writebuf,
+ },
+ inner.1,
+ )
+ }
+}
+
+impl<T, U> Stream for Framed<T, U>
+where
+ T: AsyncRead,
+ U: Decoder,
+{
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T, U> Sink for Framed<T, U>
+where
+ T: AsyncWrite,
+ U: Encoder,
+ U::Error: From<io::Error>,
+{
+ type SinkItem = U::Item;
+ type SinkError = U::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.inner.get_mut().start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.get_mut().poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.get_mut().close()
+ }
+}
+
+impl<T, U> fmt::Debug for Framed<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Framed")
+ .field("io", &self.inner.get_ref().get_ref().0)
+ .field("codec", &self.inner.get_ref().get_ref().1)
+ .finish()
+ }
+}
+
+// ===== impl Fuse =====
+
+impl<T: Read, U> Read for Fuse<T, U> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.0.read(dst)
+ }
+}
+
+impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.0.prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T: Write, U> Write for Fuse<T, U> {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ self.0.write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.0.flush()
+ }
+}
+
+impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ self.0.shutdown()
+ }
+}
+
+impl<T, U: Decoder> Decoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode(buffer)
+ }
+
+ fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ self.1.decode_eof(buffer)
+ }
+}
+
+impl<T, U: Encoder> Encoder for Fuse<T, U> {
+ type Item = U::Item;
+ type Error = U::Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ self.1.encode(item, dst)
+ }
+}
+
+/// `FramedParts` contains an export of the data of a Framed transport.
+/// It can be used to construct a new `Framed` with a different codec.
+/// It contains all current buffers and the inner transport.
+#[derive(Debug)]
+pub struct FramedParts<T> {
+ /// The inner transport used to read bytes to and write bytes to
+ pub inner: T,
+ /// The buffer with read but unprocessed data.
+ pub readbuf: BytesMut,
+ /// A buffer with unprocessed data which are not written yet.
+ pub writebuf: BytesMut,
+}
diff --git a/third_party/rust/tokio-io/src/framed_read.rs b/third_party/rust/tokio-io/src/framed_read.rs
new file mode 100644
index 0000000000..d675084b01
--- /dev/null
+++ b/third_party/rust/tokio-io/src/framed_read.rs
@@ -0,0 +1,220 @@
+#![allow(deprecated)]
+
+use std::fmt;
+
+use codec::Decoder;
+use framed::Fuse;
+use AsyncRead;
+
+use bytes::BytesMut;
+use futures::{Async, Poll, Sink, StartSend, Stream};
+
+/// A `Stream` of messages decoded from an `AsyncRead`.
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FramedRead<T, D> {
+ inner: FramedRead2<Fuse<T, D>>,
+}
+
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FramedRead2<T> {
+ inner: T,
+ eof: bool,
+ is_readable: bool,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+// ===== impl FramedRead =====
+
+impl<T, D> FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ /// Creates a new `FramedRead` with the given `decoder`.
+ pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
+ FramedRead {
+ inner: framed_read2(Fuse(inner, decoder)),
+ }
+ }
+}
+
+impl<T, D> FramedRead<T, D> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedRead`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn decoder(&self) -> &D {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn decoder_mut(&mut self) -> &mut D {
+ &mut self.inner.inner.1
+ }
+}
+
+impl<T, D> Stream for FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ type Item = D::Item;
+ type Error = D::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T, D> Sink for FramedRead<T, D>
+where
+ T: Sink,
+{
+ type SinkItem = T::SinkItem;
+ type SinkError = T::SinkError;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ self.inner.inner.0.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.inner.0.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.inner.0.close()
+ }
+}
+
+impl<T, D> fmt::Debug for FramedRead<T, D>
+where
+ T: fmt::Debug,
+ D: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FramedRead")
+ .field("inner", &self.inner.inner.0)
+ .field("decoder", &self.inner.inner.1)
+ .field("eof", &self.inner.eof)
+ .field("is_readable", &self.inner.is_readable)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedRead2 =====
+
+pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedRead2 {
+ inner: inner,
+ eof: false,
+ is_readable: buf.len() > 0,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedRead2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> Stream for FramedRead2<T>
+where
+ T: AsyncRead + Decoder,
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ // Repeatedly call `decode` or `decode_eof` as long as it is
+ // "readable". Readable is defined as not having returned `None`. If
+ // the upstream has returned EOF, and the decoder is no longer
+ // readable, it can be assumed that the decoder will never become
+ // readable again, at which point the stream is terminated.
+ if self.is_readable {
+ if self.eof {
+ let frame = self.inner.decode_eof(&mut self.buffer)?;
+ return Ok(Async::Ready(frame));
+ }
+
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = self.inner.decode(&mut self.buffer)? {
+ trace!("frame decoded from buffer");
+ return Ok(Async::Ready(Some(frame)));
+ }
+
+ self.is_readable = false;
+ }
+
+ assert!(!self.eof);
+
+ // Otherwise, try to read more data and try again. Make sure we've
+ // got room for at least one byte to read to ensure that we don't
+ // get a spurious 0 that looks like EOF
+ self.buffer.reserve(1);
+ if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
+ self.eof = true;
+ }
+
+ self.is_readable = true;
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/framed_write.rs b/third_party/rust/tokio-io/src/framed_write.rs
new file mode 100644
index 0000000000..483b9fac8d
--- /dev/null
+++ b/third_party/rust/tokio-io/src/framed_write.rs
@@ -0,0 +1,250 @@
+#![allow(deprecated)]
+
+use std::fmt;
+use std::io::{self, Read};
+
+use codec::{Decoder, Encoder};
+use framed::Fuse;
+use {AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
+
+/// A `Sink` of frames encoded to an `AsyncWrite`.
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FramedWrite<T, E> {
+ inner: FramedWrite2<Fuse<T, E>>,
+}
+
+#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FramedWrite2<T> {
+ inner: T,
+ buffer: BytesMut,
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+impl<T, E> FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder,
+{
+ /// Creates a new `FramedWrite` with the given `encoder`.
+ pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+ FramedWrite {
+ inner: framed_write2(Fuse(inner, encoder)),
+ }
+ }
+}
+
+impl<T, E> FramedWrite<T, E> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner.0
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner.0
+ }
+
+ /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner.0
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn encoder(&self) -> &E {
+ &self.inner.inner.1
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn encoder_mut(&mut self) -> &mut E {
+ &mut self.inner.inner.1
+ }
+}
+
+impl<T, E> Sink for FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder,
+{
+ type SinkItem = E::Item;
+ type SinkError = E::Error;
+
+ fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> {
+ self.inner.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ self.inner.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ Ok(self.inner.close()?)
+ }
+}
+
+impl<T, D> Stream for FramedWrite<T, D>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.inner.inner.0.poll()
+ }
+}
+
+impl<T, U> fmt::Debug for FramedWrite<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FramedWrite")
+ .field("inner", &self.inner.get_ref().0)
+ .field("encoder", &self.inner.get_ref().1)
+ .field("buffer", &self.inner.buffer)
+ .finish()
+ }
+}
+
+// ===== impl FramedWrite2 =====
+
+pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
+ FramedWrite2 {
+ inner: inner,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+}
+
+pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
+ if buf.capacity() < INITIAL_CAPACITY {
+ let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
+ buf.reserve(bytes_to_reserve);
+ }
+ FramedWrite2 {
+ inner: inner,
+ buffer: buf,
+ }
+}
+
+impl<T> FramedWrite2<T> {
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ pub fn into_parts(self) -> (T, BytesMut) {
+ (self.inner, self.buffer)
+ }
+
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+}
+
+impl<T> Sink for FramedWrite2<T>
+where
+ T: AsyncWrite + Encoder,
+{
+ type SinkItem = T::Item;
+ type SinkError = T::Error;
+
+ fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
+ // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
+ // *still* over 8KiB, then apply backpressure (reject the send).
+ if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
+ self.poll_complete()?;
+
+ if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
+ return Ok(AsyncSink::NotReady(item));
+ }
+ }
+
+ self.inner.encode(item, &mut self.buffer)?;
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
+ trace!("flushing framed transport");
+
+ while !self.buffer.is_empty() {
+ trace!("writing; remaining={}", self.buffer.len());
+
+ let n = try_ready!(self.inner.poll_write(&self.buffer));
+
+ if n == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to
+ write frame to transport",
+ )
+ .into());
+ }
+
+ // TODO: Add a way to `bytes` to do this w/o returning the drained
+ // data.
+ let _ = self.buffer.split_to(n);
+ }
+
+ // Try flushing the underlying IO
+ try_ready!(self.inner.poll_flush());
+
+ trace!("framed transport flushed");
+ return Ok(Async::Ready(()));
+ }
+
+ fn close(&mut self) -> Poll<(), Self::SinkError> {
+ try_ready!(self.poll_complete());
+ Ok(self.inner.shutdown()?)
+ }
+}
+
+impl<T: Decoder> Decoder for FramedWrite2<T> {
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+ self.inner.decode(src)
+ }
+
+ fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> {
+ self.inner.decode_eof(src)
+ }
+}
+
+impl<T: Read> Read for FramedWrite2<T> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.inner.read(dst)
+ }
+}
+
+impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.inner.prepare_uninitialized_buffer(buf)
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/copy.rs b/third_party/rust/tokio-io/src/io/copy.rs
new file mode 100644
index 0000000000..e8a1dac95d
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/copy.rs
@@ -0,0 +1,100 @@
+use std::io;
+
+use futures::{Future, Poll};
+
+use {AsyncRead, AsyncWrite};
+
+/// A future which will copy all data from a reader into a writer.
+///
+/// Created by the [`copy`] function, this future will resolve to the number of
+/// bytes copied or an error if one happens.
+///
+/// [`copy`]: fn.copy.html
+#[derive(Debug)]
+pub struct Copy<R, W> {
+ reader: Option<R>,
+ read_done: bool,
+ writer: Option<W>,
+ pos: usize,
+ cap: usize,
+ amt: u64,
+ buf: Box<[u8]>,
+}
+
+/// Creates a future which represents copying all the bytes from one object to
+/// another.
+///
+/// The returned future will copy all the bytes read from `reader` into the
+/// `writer` specified. This future will only complete once the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned and the `reader` and `writer` are
+/// consumed. On error the error is returned and the I/O objects are consumed as
+/// well.
+pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite,
+{
+ Copy {
+ reader: Some(reader),
+ read_done: false,
+ writer: Some(writer),
+ amt: 0,
+ pos: 0,
+ cap: 0,
+ buf: Box::new([0; 2048]),
+ }
+}
+
+impl<R, W> Future for Copy<R, W>
+where
+ R: AsyncRead,
+ W: AsyncWrite,
+{
+ type Item = (u64, R, W);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(u64, R, W), io::Error> {
+ loop {
+ // If our buffer is empty, then we need to read some data to
+ // continue.
+ if self.pos == self.cap && !self.read_done {
+ let reader = self.reader.as_mut().unwrap();
+ let n = try_ready!(reader.poll_read(&mut self.buf));
+ if n == 0 {
+ self.read_done = true;
+ } else {
+ self.pos = 0;
+ self.cap = n;
+ }
+ }
+
+ // If our buffer has some data, let's write it out!
+ while self.pos < self.cap {
+ let writer = self.writer.as_mut().unwrap();
+ let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap]));
+ if i == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "write zero byte into writer",
+ ));
+ } else {
+ self.pos += i;
+ self.amt += i as u64;
+ }
+ }
+
+ // If we've written al the data and we've seen EOF, flush out the
+ // data and finish the transfer.
+ // done with the entire transfer.
+ if self.pos == self.cap && self.read_done {
+ try_ready!(self.writer.as_mut().unwrap().poll_flush());
+ let reader = self.reader.take().unwrap();
+ let writer = self.writer.take().unwrap();
+ return Ok((self.amt, reader, writer).into());
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/flush.rs b/third_party/rust/tokio-io/src/io/flush.rs
new file mode 100644
index 0000000000..febc7ee1b1
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/flush.rs
@@ -0,0 +1,43 @@
+use std::io;
+
+use futures::{Async, Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to fully flush an I/O object.
+///
+/// Resolves to the underlying I/O object once the flush operation is complete.
+///
+/// Created by the [`flush`] function.
+///
+/// [`flush`]: fn.flush.html
+#[derive(Debug)]
+pub struct Flush<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely flush an I/O object and then yield the
+/// object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
+/// a retry if `WouldBlock` is seen along the way.
+pub fn flush<A>(a: A) -> Flush<A>
+where
+ A: AsyncWrite,
+{
+ Flush { a: Some(a) }
+}
+
+impl<A> Future for Flush<A>
+where
+ A: AsyncWrite,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_ready!(self.a.as_mut().unwrap().poll_flush());
+ Ok(Async::Ready(self.a.take().unwrap()))
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/mod.rs b/third_party/rust/tokio-io/src/io/mod.rs
new file mode 100644
index 0000000000..763cfaee7f
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/mod.rs
@@ -0,0 +1,32 @@
+//! I/O conveniences when working with primitives in `tokio-core`
+//!
+//! Contains various combinators to work with I/O objects and type definitions
+//! as well.
+//!
+//! A description of the high-level I/O combinators can be [found online] in
+//! addition to a description of the [low level details].
+//!
+//! [found online]: https://tokio.rs/docs/getting-started/core/
+//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
+
+mod copy;
+mod flush;
+mod read;
+mod read_exact;
+mod read_to_end;
+mod read_until;
+mod shutdown;
+mod write_all;
+
+pub use self::copy::{copy, Copy};
+pub use self::flush::{flush, Flush};
+pub use self::read::{read, Read};
+pub use self::read_exact::{read_exact, ReadExact};
+pub use self::read_to_end::{read_to_end, ReadToEnd};
+pub use self::read_until::{read_until, ReadUntil};
+pub use self::shutdown::{shutdown, Shutdown};
+pub use self::write_all::{write_all, WriteAll};
+pub use allow_std::AllowStdIo;
+pub use lines::{lines, Lines};
+pub use split::{ReadHalf, WriteHalf};
+pub use window::Window;
diff --git a/third_party/rust/tokio-io/src/io/read.rs b/third_party/rust/tokio-io/src/io/read.rs
new file mode 100644
index 0000000000..632cef4d28
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read.rs
@@ -0,0 +1,60 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+#[derive(Debug)]
+enum State<R, T> {
+ Pending { rd: R, buf: T },
+ Empty,
+}
+
+/// Tries to read some bytes directly into the given `buf` in asynchronous
+/// manner, returning a future type.
+///
+/// The returned future will resolve to both the I/O stream and the buffer
+/// as well as the number of bytes read once the read operation is completed.
+pub fn read<R, T>(rd: R, buf: T) -> Read<R, T>
+where
+ R: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ Read {
+ state: State::Pending { rd: rd, buf: buf },
+ }
+}
+
+/// A future which can be used to easily read available number of bytes to fill
+/// a buffer.
+///
+/// Created by the [`read`] function.
+#[derive(Debug)]
+pub struct Read<R, T> {
+ state: State<R, T>,
+}
+
+impl<R, T> Future for Read<R, T>
+where
+ R: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ type Item = (R, T, usize);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(R, T, usize), io::Error> {
+ let nread = match self.state {
+ State::Pending {
+ ref mut rd,
+ ref mut buf,
+ } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])),
+ State::Empty => panic!("poll a Read after it's done"),
+ };
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Pending { rd, buf } => Ok((rd, buf, nread).into()),
+ State::Empty => panic!("invalid internal state"),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_exact.rs b/third_party/rust/tokio-io/src/io/read_exact.rs
new file mode 100644
index 0000000000..3b98621ae4
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_exact.rs
@@ -0,0 +1,85 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read exactly enough bytes to fill
+/// a buffer.
+///
+/// Created by the [`read_exact`] function.
+///
+/// [`read_exact`]: fn.read_exact.html
+#[derive(Debug)]
+pub struct ReadExact<A, T> {
+ state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+ Reading { a: A, buf: T, pos: usize },
+ Empty,
+}
+
+/// Creates a future which will read exactly enough bytes to fill `buf`,
+/// returning an error if EOF is hit sooner.
+///
+/// The returned future will resolve to both the I/O stream as well as the
+/// buffer once the read operation is completed.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all data read from the stream appended to
+/// the buffer.
+pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
+where
+ A: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ ReadExact {
+ state: State::Reading {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn eof() -> io::Error {
+ io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
+}
+
+impl<A, T> Future for ReadExact<A, T>
+where
+ A: AsyncRead,
+ T: AsMut<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ ref mut buf,
+ ref mut pos,
+ } => {
+ let buf = buf.as_mut();
+ while *pos < buf.len() {
+ let n = try_ready!(a.poll_read(&mut buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Err(eof());
+ }
+ }
+ }
+ State::Empty => panic!("poll a ReadExact after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf, .. } => Ok((a, buf).into()),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_to_end.rs b/third_party/rust/tokio-io/src/io/read_to_end.rs
new file mode 100644
index 0000000000..296af6e304
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_to_end.rs
@@ -0,0 +1,66 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read the entire contents of a stream
+/// into a vector.
+///
+/// Created by the [`read_to_end`] function.
+///
+/// [`read_to_end`]: fn.read_to_end.html
+#[derive(Debug)]
+pub struct ReadToEnd<A> {
+ state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+ Reading { a: A, buf: Vec<u8> },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided.
+///
+/// In the case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success both the object and the buffer
+/// will be returned, with all data read from the stream appended to the buffer.
+pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
+where
+ A: AsyncRead,
+{
+ ReadToEnd {
+ state: State::Reading { a: a, buf: buf },
+ }
+}
+
+impl<A> Future for ReadToEnd<A>
+where
+ A: AsyncRead,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ ref mut buf,
+ } => {
+ // If we get `Ok`, then we know the stream hit EOF and we're done. If we
+ // hit "would block" then all the read data so far is in our buffer, and
+ // otherwise we propagate errors
+ try_nb!(a.read_to_end(buf));
+ }
+ State::Empty => panic!("poll ReadToEnd after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, buf } => Ok((a, buf).into()),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/read_until.rs b/third_party/rust/tokio-io/src/io/read_until.rs
new file mode 100644
index 0000000000..d0be4c9448
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/read_until.rs
@@ -0,0 +1,76 @@
+use std::io::{self, BufRead};
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncRead;
+
+/// A future which can be used to easily read the contents of a stream into a
+/// vector until the delimiter is reached.
+///
+/// Created by the [`read_until`] function.
+///
+/// [`read_until`]: fn.read_until.html
+#[derive(Debug)]
+pub struct ReadUntil<A> {
+ state: State<A>,
+}
+
+#[derive(Debug)]
+enum State<A> {
+ Reading { a: A, byte: u8, buf: Vec<u8> },
+ Empty,
+}
+
+/// Creates a future which will read all the bytes associated with the I/O
+/// object `A` into the buffer provided until the delimiter `byte` is reached.
+/// This method is the async equivalent to [`BufRead::read_until`].
+///
+/// In case of an error the buffer and the object will be discarded, with
+/// the error yielded. In the case of success the object will be destroyed and
+/// the buffer will be returned, with all bytes up to, and including, the delimiter
+/// (if found).
+///
+/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until
+pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A>
+where
+ A: AsyncRead + BufRead,
+{
+ ReadUntil {
+ state: State::Reading {
+ a: a,
+ byte: byte,
+ buf: buf,
+ },
+ }
+}
+
+impl<A> Future for ReadUntil<A>
+where
+ A: AsyncRead + BufRead,
+{
+ type Item = (A, Vec<u8>);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
+ match self.state {
+ State::Reading {
+ ref mut a,
+ byte,
+ ref mut buf,
+ } => {
+ // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter.
+ // and just return it, as we are finished.
+ // If we hit "would block" then all the read data so far
+ // is in our buffer, and otherwise we propagate errors.
+ try_nb!(a.read_until(byte, buf));
+ }
+ State::Empty => panic!("poll ReadUntil after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Reading { a, byte: _, buf } => Ok((a, buf).into()),
+ State::Empty => unreachable!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/shutdown.rs b/third_party/rust/tokio-io/src/io/shutdown.rs
new file mode 100644
index 0000000000..d963a813ad
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/shutdown.rs
@@ -0,0 +1,44 @@
+use std::io;
+
+use futures::{Async, Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to fully shutdown an I/O object.
+///
+/// Resolves to the underlying I/O object once the shutdown operation is
+/// complete.
+///
+/// Created by the [`shutdown`] function.
+///
+/// [`shutdown`]: fn.shutdown.html
+#[derive(Debug)]
+pub struct Shutdown<A> {
+ a: Option<A>,
+}
+
+/// Creates a future which will entirely shutdown an I/O object and then yield
+/// the object itself.
+///
+/// This function will consume the object provided if an error happens, and
+/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`,
+/// scheduling a retry if `WouldBlock` is seen along the way.
+pub fn shutdown<A>(a: A) -> Shutdown<A>
+where
+ A: AsyncWrite,
+{
+ Shutdown { a: Some(a) }
+}
+
+impl<A> Future for Shutdown<A>
+where
+ A: AsyncWrite,
+{
+ type Item = A;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<A, io::Error> {
+ try_ready!(self.a.as_mut().unwrap().shutdown());
+ Ok(Async::Ready(self.a.take().unwrap()))
+ }
+}
diff --git a/third_party/rust/tokio-io/src/io/write_all.rs b/third_party/rust/tokio-io/src/io/write_all.rs
new file mode 100644
index 0000000000..ba8af4a222
--- /dev/null
+++ b/third_party/rust/tokio-io/src/io/write_all.rs
@@ -0,0 +1,88 @@
+use std::io;
+use std::mem;
+
+use futures::{Future, Poll};
+
+use AsyncWrite;
+
+/// A future used to write the entire contents of some data to a stream.
+///
+/// This is created by the [`write_all`] top-level method.
+///
+/// [`write_all`]: fn.write_all.html
+#[derive(Debug)]
+pub struct WriteAll<A, T> {
+ state: State<A, T>,
+}
+
+#[derive(Debug)]
+enum State<A, T> {
+ Writing { a: A, buf: T, pos: usize },
+ Empty,
+}
+
+/// Creates a future that will write the entire contents of the buffer `buf` to
+/// the stream `a` provided.
+///
+/// The returned future will not return until all the data has been written, and
+/// the future will resolve to the stream as well as the buffer (for reuse if
+/// needed).
+///
+/// Any error which happens during writing will cause both the stream and the
+/// buffer to get destroyed.
+///
+/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
+/// be broadly applicable to accepting data which can be converted to a slice.
+/// The `Window` struct is also available in this crate to provide a different
+/// window into a slice if necessary.
+pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ WriteAll {
+ state: State::Writing {
+ a: a,
+ buf: buf,
+ pos: 0,
+ },
+ }
+}
+
+fn zero_write() -> io::Error {
+ io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
+}
+
+impl<A, T> Future for WriteAll<A, T>
+where
+ A: AsyncWrite,
+ T: AsRef<[u8]>,
+{
+ type Item = (A, T);
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<(A, T), io::Error> {
+ match self.state {
+ State::Writing {
+ ref mut a,
+ ref buf,
+ ref mut pos,
+ } => {
+ let buf = buf.as_ref();
+ while *pos < buf.len() {
+ let n = try_ready!(a.poll_write(&buf[*pos..]));
+ *pos += n;
+ if n == 0 {
+ return Err(zero_write());
+ }
+ }
+ }
+ State::Empty => panic!("poll a WriteAll after it's done"),
+ }
+
+ match mem::replace(&mut self.state, State::Empty) {
+ State::Writing { a, buf, .. } => Ok((a, buf).into()),
+ State::Empty => panic!(),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-io/src/length_delimited.rs b/third_party/rust/tokio-io/src/length_delimited.rs
new file mode 100644
index 0000000000..c211c95356
--- /dev/null
+++ b/third_party/rust/tokio-io/src/length_delimited.rs
@@ -0,0 +1,943 @@
+#![allow(deprecated)]
+
+use {codec, AsyncRead, AsyncWrite};
+
+use bytes::buf::Chain;
+use bytes::{Buf, BufMut, BytesMut, IntoBuf};
+
+use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
+
+use std::error::Error as StdError;
+use std::io::{self, Cursor};
+use std::{cmp, fmt};
+
+/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values.
+///
+/// `Builder` enables constructing configured length delimited framers. Note
+/// that not all configuration settings apply to both encoding and decoding. See
+/// the documentation for specific methods for more detail.
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+#[derive(Debug, Clone, Copy)]
+pub struct Builder {
+ // Maximum frame length
+ max_frame_len: usize,
+
+ // Number of bytes representing the field length
+ length_field_len: usize,
+
+ // Number of bytes in the header before the length field
+ length_field_offset: usize,
+
+ // Adjust the length specified in the header field by this amount
+ length_adjustment: isize,
+
+ // Total number of bytes to skip before reading the payload, if not set,
+ // `length_field_len + length_field_offset`
+ num_skip: Option<usize>,
+
+ // Length field byte order (little or big endian)
+ length_field_is_big_endian: bool,
+}
+
+/// Adapts a byte stream into a unified `Stream` and `Sink` that works over
+/// entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct Framed<T, B: IntoBuf = BytesMut> {
+ inner: FramedRead<FramedWrite<T, B>>,
+}
+
+/// Adapts a byte stream to a `Stream` yielding entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+#[derive(Debug)]
+pub struct FramedRead<T> {
+ inner: codec::FramedRead<T, Decoder>,
+}
+
+/// An error when the number of bytes read is more than max frame length.
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FrameTooBig {
+ _priv: (),
+}
+
+#[derive(Debug)]
+struct Decoder {
+ // Configuration values
+ builder: Builder,
+
+ // Read state
+ state: DecodeState,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum DecodeState {
+ Head,
+ Data(usize),
+}
+
+/// Adapts a byte stream to a `Sink` accepting entire frame values.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")]
+#[doc(hidden)]
+pub struct FramedWrite<T, B: IntoBuf = BytesMut> {
+ // I/O type
+ inner: T,
+
+ // Configuration values
+ builder: Builder,
+
+ // Current frame being written
+ frame: Option<Chain<Cursor<BytesMut>, B::Buf>>,
+}
+
+// ===== impl Framed =====
+
+impl<T: AsyncRead + AsyncWrite, B: IntoBuf> Framed<T, B> {
+ /// Creates a new `Framed` with default configuration values.
+ pub fn new(inner: T) -> Framed<T, B> {
+ Builder::new().new_framed(inner)
+ }
+}
+
+impl<T, B: IntoBuf> Framed<T, B> {
+ /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ self.inner.get_ref().get_ref()
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.get_mut().get_mut()
+ }
+
+ /// Consumes the `Framed`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner().into_inner()
+ }
+}
+
+impl<T: AsyncRead, B: IntoBuf> Stream for Framed<T, B> {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> Sink for Framed<T, B> {
+ type SinkItem = B;
+ type SinkError = io::Error;
+
+ fn start_send(&mut self, item: B) -> StartSend<B, io::Error> {
+ self.inner.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), io::Error> {
+ self.inner.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), io::Error> {
+ self.inner.close()
+ }
+}
+
+impl<T, B: IntoBuf> fmt::Debug for Framed<T, B>
+where
+ T: fmt::Debug,
+ B::Buf: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Framed")
+ .field("inner", &self.inner)
+ .finish()
+ }
+}
+
+// ===== impl FramedRead =====
+
+impl<T: AsyncRead> FramedRead<T> {
+ /// Creates a new `FramedRead` with default configuration values.
+ pub fn new(inner: T) -> FramedRead<T> {
+ Builder::new().new_read(inner)
+ }
+}
+
+impl<T> FramedRead<T> {
+ /// Returns the current max frame setting
+ ///
+ /// This is the largest size this codec will accept from the wire. Larger
+ /// frames will be rejected.
+ pub fn max_frame_length(&self) -> usize {
+ self.inner.decoder().builder.max_frame_len
+ }
+
+ /// Updates the max frame setting.
+ ///
+ /// The change takes effect the next time a frame is decoded. In other
+ /// words, if a frame is currently in process of being decoded with a frame
+ /// size greater than `val` but less than the max frame length in effect
+ /// before calling this function, then the frame will be allowed.
+ pub fn set_max_frame_length(&mut self, val: usize) {
+ self.inner.decoder_mut().builder.max_frame_length(val);
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ self.inner.get_ref()
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ self.inner.get_mut()
+ }
+
+ /// Consumes the `FramedRead`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.into_inner()
+ }
+}
+
+impl<T: AsyncRead> Stream for FramedRead<T> {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T: Sink> Sink for FramedRead<T> {
+ type SinkItem = T::SinkItem;
+ type SinkError = T::SinkError;
+
+ fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
+ self.inner.start_send(item)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
+ self.inner.poll_complete()
+ }
+
+ fn close(&mut self) -> Poll<(), T::SinkError> {
+ self.inner.close()
+ }
+}
+
+impl<T: io::Write> io::Write for FramedRead<T> {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ self.inner.get_mut().write(src)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.inner.get_mut().flush()
+ }
+}
+
+impl<T: AsyncWrite> AsyncWrite for FramedRead<T> {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ self.inner.get_mut().shutdown()
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ self.inner.get_mut().write_buf(buf)
+ }
+}
+
+// ===== impl Decoder ======
+
+impl Decoder {
+ fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
+ let head_len = self.builder.num_head_bytes();
+ let field_len = self.builder.length_field_len;
+
+ if src.len() < head_len {
+ // Not enough data
+ return Ok(None);
+ }
+
+ let n = {
+ let mut src = Cursor::new(&mut *src);
+
+ // Skip the required bytes
+ src.advance(self.builder.length_field_offset);
+
+ // match endianess
+ let n = if self.builder.length_field_is_big_endian {
+ src.get_uint_be(field_len)
+ } else {
+ src.get_uint_le(field_len)
+ };
+
+ if n > self.builder.max_frame_len as u64 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ FrameTooBig { _priv: () },
+ ));
+ }
+
+ // The check above ensures there is no overflow
+ let n = n as usize;
+
+ // Adjust `n` with bounds checking
+ let n = if self.builder.length_adjustment < 0 {
+ n.checked_sub(-self.builder.length_adjustment as usize)
+ } else {
+ n.checked_add(self.builder.length_adjustment as usize)
+ };
+
+ // Error handling
+ match n {
+ Some(n) => n,
+ None => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "provided length would overflow after adjustment",
+ ));
+ }
+ }
+ };
+
+ let num_skip = self.builder.get_num_skip();
+
+ if num_skip > 0 {
+ let _ = src.split_to(num_skip);
+ }
+
+ // Ensure that the buffer has enough space to read the incoming
+ // payload
+ src.reserve(n);
+
+ return Ok(Some(n));
+ }
+
+ fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ // At this point, the buffer has already had the required capacity
+ // reserved. All there is to do is read.
+ if src.len() < n {
+ return Ok(None);
+ }
+
+ Ok(Some(src.split_to(n)))
+ }
+}
+
+impl codec::Decoder for Decoder {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ let n = match self.state {
+ DecodeState::Head => match self.decode_head(src)? {
+ Some(n) => {
+ self.state = DecodeState::Data(n);
+ n
+ }
+ None => return Ok(None),
+ },
+ DecodeState::Data(n) => n,
+ };
+
+ match self.decode_data(n, src)? {
+ Some(data) => {
+ // Update the decode state
+ self.state = DecodeState::Head;
+
+ // Make sure the buffer has enough space to read the next head
+ src.reserve(self.builder.num_head_bytes());
+
+ Ok(Some(data))
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+// ===== impl FramedWrite =====
+
+impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
+ /// Creates a new `FramedWrite` with default configuration values.
+ pub fn new(inner: T) -> FramedWrite<T, B> {
+ Builder::new().new_write(inner)
+ }
+}
+
+impl<T, B: IntoBuf> FramedWrite<T, B> {
+ /// Returns the current max frame setting
+ ///
+ /// This is the largest size this codec will write to the wire. Larger
+ /// frames will be rejected.
+ pub fn max_frame_length(&self) -> usize {
+ self.builder.max_frame_len
+ }
+
+ /// Updates the max frame setting.
+ ///
+ /// The change takes effect the next time a frame is encoded. In other
+ /// words, if a frame is currently in process of being encoded with a frame
+ /// size greater than `val` but less than the max frame length in effect
+ /// before calling this function, then the frame will be allowed.
+ pub fn set_max_frame_length(&mut self, val: usize) {
+ self.builder.max_frame_length(val);
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise being
+ /// worked with.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> {
+ // If there is a buffered frame, try to write it to `T`
+ fn do_write(&mut self) -> Poll<(), io::Error> {
+ if self.frame.is_none() {
+ return Ok(Async::Ready(()));
+ }
+
+ loop {
+ let frame = self.frame.as_mut().unwrap();
+ if try_ready!(self.inner.write_buf(frame)) == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to write frame to transport",
+ ));
+ }
+
+ if !frame.has_remaining() {
+ break;
+ }
+ }
+
+ self.frame = None;
+
+ Ok(Async::Ready(()))
+ }
+
+ fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> {
+ let mut head = BytesMut::with_capacity(8);
+ let n = buf.remaining();
+
+ if n > self.builder.max_frame_len {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ FrameTooBig { _priv: () },
+ ));
+ }
+
+ // Adjust `n` with bounds checking
+ let n = if self.builder.length_adjustment < 0 {
+ n.checked_add(-self.builder.length_adjustment as usize)
+ } else {
+ n.checked_sub(self.builder.length_adjustment as usize)
+ };
+
+ // Error handling
+ let n = match n {
+ Some(n) => n,
+ None => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "provided length would overflow after adjustment",
+ ));
+ }
+ };
+
+ if self.builder.length_field_is_big_endian {
+ head.put_uint_be(n as u64, self.builder.length_field_len);
+ } else {
+ head.put_uint_le(n as u64, self.builder.length_field_len);
+ }
+
+ debug_assert!(self.frame.is_none());
+
+ self.frame = Some(head.into_buf().chain(buf));
+
+ Ok(())
+ }
+}
+
+impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> {
+ type SinkItem = B;
+ type SinkError = io::Error;
+
+ fn start_send(&mut self, item: B) -> StartSend<B, io::Error> {
+ if !self.do_write()?.is_ready() {
+ return Ok(AsyncSink::NotReady(item));
+ }
+
+ self.set_frame(item.into_buf())?;
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), io::Error> {
+ // Write any buffered frame to T
+ try_ready!(self.do_write());
+
+ // Try flushing the underlying IO
+ try_ready!(self.inner.poll_flush());
+
+ return Ok(Async::Ready(()));
+ }
+
+ fn close(&mut self) -> Poll<(), io::Error> {
+ try_ready!(self.poll_complete());
+ self.inner.shutdown()
+ }
+}
+
+impl<T: Stream, B: IntoBuf> Stream for FramedWrite<T, B> {
+ type Item = T::Item;
+ type Error = T::Error;
+
+ fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
+ self.inner.poll()
+ }
+}
+
+impl<T: io::Read, B: IntoBuf> io::Read for FramedWrite<T, B> {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ self.get_mut().read(dst)
+ }
+}
+
+impl<T: AsyncRead, U: IntoBuf> AsyncRead for FramedWrite<T, U> {
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ self.get_mut().read_buf(buf)
+ }
+
+ unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
+ self.get_ref().prepare_uninitialized_buffer(buf)
+ }
+}
+
+impl<T, B: IntoBuf> fmt::Debug for FramedWrite<T, B>
+where
+ T: fmt::Debug,
+ B::Buf: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FramedWrite")
+ .field("inner", &self.inner)
+ .field("builder", &self.builder)
+ .field("frame", &self.frame)
+ .finish()
+ }
+}
+
+// ===== impl Builder =====
+
+impl Builder {
+ /// Creates a new length delimited framer builder with default configuration
+ /// values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .length_field_offset(0)
+ /// .length_field_length(2)
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn new() -> Builder {
+ Builder {
+ // Default max frame length of 8MB
+ max_frame_len: 8 * 1_024 * 1_024,
+
+ // Default byte length of 4
+ length_field_len: 4,
+
+ // Default to the header field being at the start of the header.
+ length_field_offset: 0,
+
+ length_adjustment: 0,
+
+ // Total number of bytes to skip before reading the payload, if not set,
+ // `length_field_len + length_field_offset`
+ num_skip: None,
+
+ // Default to reading the length field in network (big) endian.
+ length_field_is_big_endian: true,
+ }
+ }
+
+ /// Read the length field as a big endian integer
+ ///
+ /// This is the default setting.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .big_endian()
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn big_endian(&mut self) -> &mut Self {
+ self.length_field_is_big_endian = true;
+ self
+ }
+
+ /// Read the length field as a little endian integer
+ ///
+ /// The default setting is big endian.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .little_endian()
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn little_endian(&mut self) -> &mut Self {
+ self.length_field_is_big_endian = false;
+ self
+ }
+
+ /// Read the length field as a native endian integer
+ ///
+ /// The default setting is big endian.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .native_endian()
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn native_endian(&mut self) -> &mut Self {
+ if cfg!(target_endian = "big") {
+ self.big_endian()
+ } else {
+ self.little_endian()
+ }
+ }
+
+ /// Sets the max frame length
+ ///
+ /// This configuration option applies to both encoding and decoding. The
+ /// default value is 8MB.
+ ///
+ /// When decoding, the length field read from the byte stream is checked
+ /// against this setting **before** any adjustments are applied. When
+ /// encoding, the length of the submitted payload is checked against this
+ /// setting.
+ ///
+ /// When frames exceed the max length, an `io::Error` with the custom value
+ /// of the `FrameTooBig` type will be returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .max_frame_length(8 * 1024)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
+ self.max_frame_len = val;
+ self
+ }
+
+ /// Sets the number of bytes used to represent the length field
+ ///
+ /// The default value is `4`. The max value is `8`.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .length_field_length(4)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn length_field_length(&mut self, val: usize) -> &mut Self {
+ assert!(val > 0 && val <= 8, "invalid length field length");
+ self.length_field_len = val;
+ self
+ }
+
+ /// Sets the number of bytes in the header before the length field
+ ///
+ /// This configuration option only applies to decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .length_field_offset(1)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
+ self.length_field_offset = val;
+ self
+ }
+
+ /// Delta between the payload length specified in the header and the real
+ /// payload length
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .length_adjustment(-2)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
+ self.length_adjustment = val;
+ self
+ }
+
+ /// Sets the number of bytes to skip before reading the payload
+ ///
+ /// Default value is `length_field_len + length_field_offset`
+ ///
+ /// This configuration option only applies to decoding
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .num_skip(4)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn num_skip(&mut self, val: usize) -> &mut Self {
+ self.num_skip = Some(val);
+ self
+ }
+
+ /// Create a configured length delimited `FramedRead`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio_io::AsyncRead;
+ /// use tokio_io::codec::length_delimited::Builder;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// Builder::new()
+ /// .length_field_offset(0)
+ /// .length_field_length(2)
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// ```
+ pub fn new_read<T>(&self, upstream: T) -> FramedRead<T>
+ where
+ T: AsyncRead,
+ {
+ FramedRead {
+ inner: codec::FramedRead::new(
+ upstream,
+ Decoder {
+ builder: *self,
+ state: DecodeState::Head,
+ },
+ ),
+ }
+ }
+
+ /// Create a configured length delimited `FramedWrite`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_io;
+ /// # extern crate bytes;
+ /// # use tokio_io::AsyncWrite;
+ /// # use tokio_io::codec::length_delimited;
+ /// # use bytes::BytesMut;
+ /// # fn write_frame<T: AsyncWrite>(io: T) {
+ /// # let _: length_delimited::FramedWrite<T, BytesMut> =
+ /// length_delimited::Builder::new()
+ /// .length_field_length(2)
+ /// .new_write(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_write<T, B>(&self, inner: T) -> FramedWrite<T, B>
+ where
+ T: AsyncWrite,
+ B: IntoBuf,
+ {
+ FramedWrite {
+ inner: inner,
+ builder: *self,
+ frame: None,
+ }
+ }
+
+ /// Create a configured length delimited `Framed`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # extern crate tokio_io;
+ /// # extern crate bytes;
+ /// # use tokio_io::{AsyncRead, AsyncWrite};
+ /// # use tokio_io::codec::length_delimited;
+ /// # use bytes::BytesMut;
+ /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+ /// # let _: length_delimited::Framed<T, BytesMut> =
+ /// length_delimited::Builder::new()
+ /// .length_field_length(2)
+ /// .new_framed(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_framed<T, B>(&self, inner: T) -> Framed<T, B>
+ where
+ T: AsyncRead + AsyncWrite,
+ B: IntoBuf,
+ {
+ let inner = self.new_read(self.new_write(inner));
+ Framed { inner: inner }
+ }
+
+ fn num_head_bytes(&self) -> usize {
+ let num = self.length_field_offset + self.length_field_len;
+ cmp::max(num, self.num_skip.unwrap_or(0))
+ }
+
+ fn get_num_skip(&self) -> usize {
+ self.num_skip
+ .unwrap_or(self.length_field_offset + self.length_field_len)
+ }
+}
+
+// ===== impl FrameTooBig =====
+
+impl fmt::Debug for FrameTooBig {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("FrameTooBig").finish()
+ }
+}
+
+impl fmt::Display for FrameTooBig {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.write_str(self.description())
+ }
+}
+
+impl StdError for FrameTooBig {
+ fn description(&self) -> &str {
+ "frame size too big"
+ }
+}
diff --git a/third_party/rust/tokio-io/src/lib.rs b/third_party/rust/tokio-io/src/lib.rs
new file mode 100644
index 0000000000..6d9fe22927
--- /dev/null
+++ b/third_party/rust/tokio-io/src/lib.rs
@@ -0,0 +1,75 @@
+#![deny(missing_docs, missing_debug_implementations)]
+#![doc(html_root_url = "https://docs.rs/tokio-io/0.1.13")]
+
+//! Core I/O traits and combinators when working with Tokio.
+//!
+//! > **Note:** This crate has been **deprecated in tokio 0.2.x** and has been
+//! > moved into [`tokio::io`].
+//!
+//! [`tokio::io`]: https://docs.rs/tokio/latest/tokio/io/index.html
+//!
+//! A description of the high-level I/O combinators can be [found online] in
+//! addition to a description of the [low level details].
+//!
+//! [found online]: https://tokio.rs/docs/getting-started/core/
+//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/
+
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+extern crate futures;
+extern crate bytes;
+
+use std::io as std_io;
+
+use futures::{Future, Stream};
+
+/// A convenience typedef around a `Future` whose error component is `io::Error`
+pub type IoFuture<T> = Box<dyn Future<Item = T, Error = std_io::Error> + Send>;
+
+/// A convenience typedef around a `Stream` whose error component is `io::Error`
+pub type IoStream<T> = Box<dyn Stream<Item = T, Error = std_io::Error> + Send>;
+
+/// A convenience macro for working with `io::Result<T>` from the `Read` and
+/// `Write` traits.
+///
+/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If
+/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if
+/// it indicates `WouldBlock` or otherwise `Err` is returned.
+#[macro_export]
+macro_rules! try_nb {
+ ($e:expr) => {
+ match $e {
+ Ok(t) => t,
+ Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
+ return Ok(::futures::Async::NotReady);
+ }
+ Err(e) => return Err(e.into()),
+ }
+ };
+}
+
+pub mod codec;
+pub mod io;
+
+pub mod _tokio_codec;
+mod allow_std;
+mod async_read;
+mod async_write;
+mod framed;
+mod framed_read;
+mod framed_write;
+mod length_delimited;
+mod lines;
+mod split;
+mod window;
+
+pub use self::async_read::AsyncRead;
+pub use self::async_write::AsyncWrite;
+
+fn _assert_objects() {
+ fn _assert<T>() {}
+ _assert::<Box<dyn AsyncRead>>();
+ _assert::<Box<dyn AsyncWrite>>();
+}
diff --git a/third_party/rust/tokio-io/src/lines.rs b/third_party/rust/tokio-io/src/lines.rs
new file mode 100644
index 0000000000..8e59ff8fa2
--- /dev/null
+++ b/third_party/rust/tokio-io/src/lines.rs
@@ -0,0 +1,62 @@
+use std::io::{self, BufRead};
+use std::mem;
+
+use futures::{Poll, Stream};
+
+use AsyncRead;
+
+/// Combinator created by the top-level `lines` method which is a stream over
+/// the lines of text on an I/O object.
+#[derive(Debug)]
+pub struct Lines<A> {
+ io: A,
+ line: String,
+}
+
+/// Creates a new stream from the I/O object given representing the lines of
+/// input that are found on `A`.
+///
+/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of
+/// lines that the object contains. The returned stream will reach its end once
+/// `a` reaches EOF.
+pub fn lines<A>(a: A) -> Lines<A>
+where
+ A: AsyncRead + BufRead,
+{
+ Lines {
+ io: a,
+ line: String::new(),
+ }
+}
+
+impl<A> Lines<A> {
+ /// Returns the underlying I/O object.
+ ///
+ /// Note that this may lose data already read into internal buffers. It's
+ /// recommended to only call this once the stream has reached its end.
+ pub fn into_inner(self) -> A {
+ self.io
+ }
+}
+
+impl<A> Stream for Lines<A>
+where
+ A: AsyncRead + BufRead,
+{
+ type Item = String;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<String>, io::Error> {
+ let n = try_nb!(self.io.read_line(&mut self.line));
+ if n == 0 && self.line.len() == 0 {
+ return Ok(None.into());
+ }
+ if self.line.ends_with("\n") {
+ self.line.pop();
+ if self.line.ends_with("\r") {
+ self.line.pop();
+ }
+ }
+ Ok(Some(mem::replace(&mut self.line, String::new())).into())
+ }
+}
diff --git a/third_party/rust/tokio-io/src/split.rs b/third_party/rust/tokio-io/src/split.rs
new file mode 100644
index 0000000000..ef8f990c8c
--- /dev/null
+++ b/third_party/rust/tokio-io/src/split.rs
@@ -0,0 +1,247 @@
+use std::io::{self, Read, Write};
+
+use bytes::{Buf, BufMut};
+use futures::sync::BiLock;
+use futures::{Async, Poll};
+
+use {AsyncRead, AsyncWrite};
+
+/// The readable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct ReadHalf<T> {
+ handle: BiLock<T>,
+}
+
+impl<T: AsyncRead + AsyncWrite> ReadHalf<T> {
+ /// Reunite with a previously split `WriteHalf`.
+ ///
+ /// # Panics
+ ///
+ /// If this `ReadHalf` and the given `WriteHalf` do not originate from
+ /// the same `AsyncRead::split` operation this method will panic.
+ pub fn unsplit(self, w: WriteHalf<T>) -> T {
+ if let Ok(x) = self.handle.reunite(w.handle) {
+ x
+ } else {
+ panic!("Unrelated `WriteHalf` passed to `ReadHalf::unsplit`.")
+ }
+ }
+}
+
+/// The writable half of an object returned from `AsyncRead::split`.
+#[derive(Debug)]
+pub struct WriteHalf<T> {
+ handle: BiLock<T>,
+}
+
+impl<T: AsyncRead + AsyncWrite> WriteHalf<T> {
+ /// Reunite with a previously split `ReadHalf`.
+ ///
+ /// # panics
+ ///
+ /// If this `WriteHalf` and the given `ReadHalf` do not originate from
+ /// the same `AsyncRead::split` operation this method will panic.
+ pub fn unsplit(self, r: ReadHalf<T>) -> T {
+ if let Ok(x) = self.handle.reunite(r.handle) {
+ x
+ } else {
+ panic!("Unrelated `ReadHalf` passed to `WriteHalf::unsplit`.")
+ }
+ }
+}
+
+pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) {
+ let (a, b) = BiLock::new(t);
+ (ReadHalf { handle: a }, WriteHalf { handle: b })
+}
+
+fn would_block() -> io::Error {
+ io::Error::new(io::ErrorKind::WouldBlock, "would block")
+}
+
+impl<T: AsyncRead> Read for ReadHalf<T> {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ match self.handle.poll_lock() {
+ Async::Ready(mut l) => l.read(buf),
+ Async::NotReady => Err(would_block()),
+ }
+ }
+}
+
+impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
+ l.read_buf(buf)
+ }
+}
+
+impl<T: AsyncWrite> Write for WriteHalf<T> {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ match self.handle.poll_lock() {
+ Async::Ready(mut l) => l.write(buf),
+ Async::NotReady => Err(would_block()),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ match self.handle.poll_lock() {
+ Async::Ready(mut l) => l.flush(),
+ Async::NotReady => Err(would_block()),
+ }
+ }
+}
+
+impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
+ l.shutdown()
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
+ where
+ Self: Sized,
+ {
+ let mut l = try_ready!(wrap_as_io(self.handle.poll_lock()));
+ l.write_buf(buf)
+ }
+}
+
+fn wrap_as_io<T>(t: Async<T>) -> Result<Async<T>, io::Error> {
+ Ok(t)
+}
+
+#[cfg(test)]
+mod tests {
+ extern crate tokio_current_thread;
+
+ use super::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
+ use bytes::{BytesMut, IntoBuf};
+ use futures::sync::BiLock;
+ use futures::{future::lazy, future::ok, Async, Poll};
+
+ use std::io::{self, Read, Write};
+
+ struct RW;
+
+ impl Read for RW {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ Ok(1)
+ }
+ }
+
+ impl AsyncRead for RW {}
+
+ impl Write for RW {
+ fn write(&mut self, _: &[u8]) -> io::Result<usize> {
+ Ok(1)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+ }
+
+ impl AsyncWrite for RW {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(Async::Ready(()))
+ }
+ }
+
+ #[test]
+ fn split_readhalf_translate_wouldblock_to_not_ready() {
+ tokio_current_thread::block_on_all(lazy(move || {
+ let rw = RW {};
+ let (a, b) = BiLock::new(rw);
+ let mut rx = ReadHalf { handle: a };
+
+ let mut buf = BytesMut::with_capacity(64);
+
+ // First read is uncontended, should go through.
+ assert!(rx.read_buf(&mut buf).unwrap().is_ready());
+
+ // Take lock from write side.
+ let lock = b.poll_lock();
+
+ // Second read should be NotReady.
+ assert!(!rx.read_buf(&mut buf).unwrap().is_ready());
+
+ drop(lock);
+
+ // Back to uncontended.
+ assert!(rx.read_buf(&mut buf).unwrap().is_ready());
+
+ ok::<(), ()>(())
+ }))
+ .unwrap();
+ }
+
+ #[test]
+ fn split_writehalf_translate_wouldblock_to_not_ready() {
+ tokio_current_thread::block_on_all(lazy(move || {
+ let rw = RW {};
+ let (a, b) = BiLock::new(rw);
+ let mut tx = WriteHalf { handle: a };
+
+ let bufmut = BytesMut::with_capacity(64);
+ let mut buf = bufmut.into_buf();
+
+ // First write is uncontended, should go through.
+ assert!(tx.write_buf(&mut buf).unwrap().is_ready());
+
+ // Take lock from read side.
+ let lock = b.poll_lock();
+
+ // Second write should be NotReady.
+ assert!(!tx.write_buf(&mut buf).unwrap().is_ready());
+
+ drop(lock);
+
+ // Back to uncontended.
+ assert!(tx.write_buf(&mut buf).unwrap().is_ready());
+
+ ok::<(), ()>(())
+ }))
+ .unwrap();
+ }
+
+ #[test]
+ fn unsplit_ok() {
+ let (r, w) = RW.split();
+ r.unsplit(w);
+
+ let (r, w) = RW.split();
+ w.unsplit(r);
+ }
+
+ #[test]
+ #[should_panic]
+ fn unsplit_err1() {
+ let (r, _) = RW.split();
+ let (_, w) = RW.split();
+ r.unsplit(w);
+ }
+
+ #[test]
+ #[should_panic]
+ fn unsplit_err2() {
+ let (_, w) = RW.split();
+ let (r, _) = RW.split();
+ r.unsplit(w);
+ }
+
+ #[test]
+ #[should_panic]
+ fn unsplit_err3() {
+ let (_, w) = RW.split();
+ let (r, _) = RW.split();
+ w.unsplit(r);
+ }
+
+ #[test]
+ #[should_panic]
+ fn unsplit_err4() {
+ let (r, _) = RW.split();
+ let (_, w) = RW.split();
+ w.unsplit(r);
+ }
+}
diff --git a/third_party/rust/tokio-io/src/window.rs b/third_party/rust/tokio-io/src/window.rs
new file mode 100644
index 0000000000..4ded9ad403
--- /dev/null
+++ b/third_party/rust/tokio-io/src/window.rs
@@ -0,0 +1,117 @@
+use std::ops;
+
+/// A owned window around an underlying buffer.
+///
+/// Normally slices work great for considering sub-portions of a buffer, but
+/// unfortunately a slice is a *borrowed* type in Rust which has an associated
+/// lifetime. When working with future and async I/O these lifetimes are not
+/// always appropriate, and are sometimes difficult to store in tasks. This
+/// type strives to fill this gap by providing an "owned slice" around an
+/// underlying buffer of bytes.
+///
+/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable
+/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation
+/// that this type carries.
+///
+/// This type can be particularly useful when working with the `write_all`
+/// combinator in this crate. Data can be sliced via `Window`, consumed by
+/// `write_all`, and then earned back once the write operation finishes through
+/// the `into_inner` method on this type.
+#[derive(Debug)]
+pub struct Window<T> {
+ inner: T,
+ range: ops::Range<usize>,
+}
+
+impl<T: AsRef<[u8]>> Window<T> {
+ /// Creates a new window around the buffer `t` defaulting to the entire
+ /// slice.
+ ///
+ /// Further methods can be called on the returned `Window<T>` to alter the
+ /// window into the data provided.
+ pub fn new(t: T) -> Window<T> {
+ Window {
+ range: 0..t.as_ref().len(),
+ inner: t,
+ }
+ }
+
+ /// Gets a shared reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_ref(&self) -> &T {
+ &self.inner
+ }
+
+ /// Gets a mutable reference to the underlying buffer inside of this
+ /// `Window`.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner
+ }
+
+ /// Consumes this `Window`, returning the underlying buffer.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+
+ /// Returns the starting index of this window into the underlying buffer
+ /// `T`.
+ pub fn start(&self) -> usize {
+ self.range.start
+ }
+
+ /// Returns the end index of this window into the underlying buffer
+ /// `T`.
+ pub fn end(&self) -> usize {
+ self.range.end
+ }
+
+ /// Changes the starting index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `start` is out of bounds for the underlying
+ /// slice or if it comes after the `end` configured in this window.
+ pub fn set_start(&mut self, start: usize) -> &mut Window<T> {
+ assert!(start <= self.inner.as_ref().len());
+ assert!(start <= self.range.end);
+ self.range.start = start;
+ self
+ }
+
+ /// Changes the end index of this window to the index specified.
+ ///
+ /// Returns the windows back to chain multiple calls to this method.
+ ///
+ /// # Panics
+ ///
+ /// This method will panic if `end` is out of bounds for the underlying
+ /// slice or if it comes before the `start` configured in this window.
+ pub fn set_end(&mut self, end: usize) -> &mut Window<T> {
+ assert!(end <= self.inner.as_ref().len());
+ assert!(self.range.start <= end);
+ self.range.end = end;
+ self
+ }
+
+ // TODO: how about a generic set() method along the lines of:
+ //
+ // buffer.set(..3)
+ // .set(0..2)
+ // .set(4..)
+ //
+ // etc.
+}
+
+impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner.as_ref()[self.range.start..self.range.end]
+ }
+}
+
+impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> {
+ fn as_mut(&mut self) -> &mut [u8] {
+ &mut self.inner.as_mut()[self.range.start..self.range.end]
+ }
+}