diff options
Diffstat (limited to '')
31 files changed, 4681 insertions, 0 deletions
diff --git a/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs new file mode 100644 index 0000000000..edc0ecc0d4 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/decoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implementation of Encoder in tokio_io. + +pub use codec::Decoder; diff --git a/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs new file mode 100644 index 0000000000..20b9e375e0 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/encoder.rs @@ -0,0 +1,3 @@ +// For now, we need to keep the implementation of Encoder in tokio_io. + +pub use codec::Encoder; diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs new file mode 100644 index 0000000000..d290575e79 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed.rs @@ -0,0 +1,283 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read, Write}; + +use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use codec::{Decoder, Encoder}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Poll, Sink, StartSend, Stream}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +pub struct Fuse<T, U>(pub T, pub U); + +impl<T, U> Framed<T, U> +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn new(inner: T, codec: U) -> Framed<T, U> { + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> { + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf), + parts.read_buf, + ), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Returns a reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec(&self) -> &U { + &self.inner.get_ref().get_ref().1 + } + + /// Returns a mutable reference to the underlying codec wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying codec + /// as it may corrupt the stream of frames otherwise being worked with. + pub fn codec_mut(&mut self) -> &mut U { + &mut self.inner.get_mut().get_mut().1 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T, U> { + let (inner, read_buf) = self.inner.into_parts(); + let (inner, write_buf) = inner.into_parts(); + + FramedParts { + io: inner.0, + codec: inner.1, + read_buf: read_buf, + write_buf: write_buf, + _priv: (), + } + } +} + +impl<T, U> Stream for Framed<T, U> +where + T: AsyncRead, + U: Decoder, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, U> Sink for Framed<T, U> +where + T: AsyncWrite, + U: Encoder, + U::Error: From<io::Error>, +{ + type SinkItem = U::Item; + type SinkError = U::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.get_mut().start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().close() + } +} + +impl<T, U> fmt::Debug for Framed<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl<T: Read, U> Read for Fuse<T, U> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.0.read(dst) + } +} + +impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } +} + +impl<T: Write, U> Write for Fuse<T, U> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} + +impl<T, U: Decoder> Decoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl<T, U: Encoder> Encoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts<T, U> { + /// The inner transport used to read bytes to and write bytes to + pub io: T, + + /// The codec + pub codec: U, + + /// The buffer with read but unprocessed data. + pub read_buf: BytesMut, + + /// A buffer with unprocessed data which are not written yet. + pub write_buf: BytesMut, + + /// This private field allows us to add additional fields in the future in a + /// backwards compatible way. + _priv: (), +} + +impl<T, U> FramedParts<T, U> { + /// Create a new, default, `FramedParts` + pub fn new(io: T, codec: U) -> FramedParts<T, U> { + FramedParts { + io, + codec, + read_buf: BytesMut::new(), + write_buf: BytesMut::new(), + _priv: (), + } + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs new file mode 100644 index 0000000000..ea7550877c --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_read.rs @@ -0,0 +1,216 @@ +#![allow(deprecated)] + +use std::fmt; + +use super::framed::Fuse; +use codec::Decoder; +use AsyncRead; + +use bytes::BytesMut; +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A `Stream` of messages decoded from an `AsyncRead`. +pub struct FramedRead<T, D> { + inner: FramedRead2<Fuse<T, D>>, +} + +pub struct FramedRead2<T> { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl<T, D> FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl<T, D> FramedRead<T, D> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl<T, D> Stream for FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + type Item = D::Item; + type Error = D::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, D> Sink for FramedRead<T, D> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.inner.0.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.close() + } +} + +impl<T, D> fmt::Debug for FramedRead<T, D> +where + T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub fn framed_read2<T>(inner: T) -> FramedRead2<T> { + FramedRead2 { + inner: inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner: inner, + eof: false, + is_readable: buf.len() > 0, + buffer: buf, + } +} + +impl<T> FramedRead2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Stream for FramedRead2<T> +where + T: AsyncRead + Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if self.is_readable { + if self.eof { + let frame = self.inner.decode_eof(&mut self.buffer)?; + return Ok(Async::Ready(frame)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = self.inner.decode(&mut self.buffer)? { + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.buffer.reserve(1); + if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { + self.eof = true; + } + + self.is_readable = true; + } + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs new file mode 100644 index 0000000000..7541b1730d --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/framed_write.rs @@ -0,0 +1,246 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read}; + +use super::framed::Fuse; +use codec::{Decoder, Encoder}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +/// A `Sink` of frames encoded to an `AsyncWrite`. +pub struct FramedWrite<T, E> { + inner: FramedWrite2<Fuse<T, E>>, +} + +pub struct FramedWrite2<T> { + inner: T, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl<T, E> FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl<T, E> FramedWrite<T, E> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +impl<T, E> Sink for FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + type SinkItem = E::Item; + type SinkError = E::Error; + + fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.inner.close()?) + } +} + +impl<T, D> Stream for FramedWrite<T, D> +where + T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.inner.0.poll() + } +} + +impl<T, U> fmt::Debug for FramedWrite<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedWrite2 ===== + +pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> { + FramedWrite2 { + inner: inner, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedWrite2 { + inner: inner, + buffer: buf, + } +} + +impl<T> FramedWrite2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Sink for FramedWrite2<T> +where + T: AsyncWrite + Encoder, +{ + type SinkItem = T::Item; + type SinkError = T::Error; + + fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> { + // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's + // *still* over 8KiB, then apply backpressure (reject the send). + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + self.poll_complete()?; + + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.inner.encode(item, &mut self.buffer)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + trace!("flushing framed transport"); + + while !self.buffer.is_empty() { + trace!("writing; remaining={}", self.buffer.len()); + + let n = try_ready!(self.inner.poll_write(&self.buffer)); + + if n == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to \ + write frame to transport", + ) + .into()); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.buffer.split_to(n); + } + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + trace!("framed transport flushed"); + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_complete()); + Ok(self.inner.shutdown()?) + } +} + +impl<T: Decoder> Decoder for FramedWrite2<T> { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode(src) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode_eof(src) + } +} + +impl<T: Read> Read for FramedWrite2<T> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner.read(dst) + } +} + +impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/third_party/rust/tokio-io/src/_tokio_codec/mod.rs b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs new file mode 100644 index 0000000000..5d7eb21890 --- /dev/null +++ b/third_party/rust/tokio-io/src/_tokio_codec/mod.rs @@ -0,0 +1,36 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +#![deny(missing_docs, missing_debug_implementations)] +#![doc(hidden, html_root_url = "https://docs.rs/tokio-codec/0.1.0")] + +// _tokio_codec are the items that belong in the `tokio_codec` crate. However, because we need to +// maintain backward compatibility until the next major breaking change, they are defined here. +// When the next breaking change comes, they should be moved to the `tokio_codec` crate and become +// independent. +// +// The primary reason we can't move these to `tokio-codec` now is because, again for backward +// compatibility reasons, we need to keep `Decoder` and `Encoder` in tokio_io::codec. And `Decoder` +// and `Encoder` needs to reference `Framed`. So they all still need to still be in the same +// module. + +mod decoder; +mod encoder; +mod framed; +mod framed_read; +mod framed_write; + +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; +pub use self::framed::{Framed, FramedParts}; +pub use self::framed_read::FramedRead; +pub use self::framed_write::FramedWrite; diff --git a/third_party/rust/tokio-io/src/allow_std.rs b/third_party/rust/tokio-io/src/allow_std.rs new file mode 100644 index 0000000000..af39ac2197 --- /dev/null +++ b/third_party/rust/tokio-io/src/allow_std.rs @@ -0,0 +1,93 @@ +use futures::{Async, Poll}; +use std::{fmt, io}; +use {AsyncRead, AsyncWrite}; + +/// A simple wrapper type which allows types that only implement +/// `std::io::Read` or `std::io::Write` to be used in contexts which expect +/// an `AsyncRead` or `AsyncWrite`. +/// +/// If these types issue an error with the kind `io::ErrorKind::WouldBlock`, +/// it is expected that they will notify the current task on readiness. +/// Synchronous `std` types should not issue errors of this kind and +/// are safe to use in this context. However, using these types with +/// `AllowStdIo` will cause the event loop to block, so they should be used +/// with care. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct AllowStdIo<T>(T); + +impl<T> AllowStdIo<T> { + /// Creates a new `AllowStdIo` from an existing IO object. + pub fn new(io: T) -> Self { + AllowStdIo(io) + } + + /// Returns a reference to the contained IO object. + pub fn get_ref(&self) -> &T { + &self.0 + } + + /// Returns a mutable reference to the contained IO object. + pub fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes self and returns the contained IO object. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl<T> io::Write for AllowStdIo<T> +where + T: io::Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + self.0.write_all(buf) + } + fn write_fmt(&mut self, fmt: fmt::Arguments) -> io::Result<()> { + self.0.write_fmt(fmt) + } +} + +impl<T> AsyncWrite for AllowStdIo<T> +where + T: io::Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } +} + +impl<T> io::Read for AllowStdIo<T> +where + T: io::Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.0.read(buf) + } + // TODO: implement the `initializer` fn when it stabilizes. + // See rust-lang/rust #42788 + fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { + self.0.read_to_end(buf) + } + fn read_to_string(&mut self, buf: &mut String) -> io::Result<usize> { + self.0.read_to_string(buf) + } + fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> { + self.0.read_exact(buf) + } +} + +impl<T> AsyncRead for AllowStdIo<T> +where + T: io::Read, +{ + // TODO: override prepare_uninitialized_buffer once `Read::initializer` is stable. + // See rust-lang/rust #42788 +} diff --git a/third_party/rust/tokio-io/src/async_read.rs b/third_party/rust/tokio-io/src/async_read.rs new file mode 100644 index 0000000000..d82aa71160 --- /dev/null +++ b/third_party/rust/tokio-io/src/async_read.rs @@ -0,0 +1,173 @@ +use bytes::BufMut; +use futures::{Async, Poll}; +use std::io as std_io; + +#[allow(deprecated)] +use codec::{Decoder, Encoder, Framed}; +use split::{ReadHalf, WriteHalf}; +use {framed, split, AsyncWrite}; + +/// Read bytes asynchronously. +/// +/// This trait inherits from `std::io::Read` and indicates that an I/O object is +/// **non-blocking**. All non-blocking I/O objects must return an error when +/// bytes are unavailable instead of blocking the current thread. +/// +/// Specifically, this means that the `poll_read` function will return one of +/// the following: +/// +/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately read +/// and placed into the output buffer, where `n` == 0 implies that EOF has +/// been reached. +/// +/// * `Ok(Async::NotReady)` means that no data was read into the buffer +/// provided. The I/O object is not currently readable but may become readable +/// in the future. Most importantly, **the current future's task is scheduled +/// to get unparked when the object is readable**. This means that like +/// `Future::poll` you'll receive a notification when the I/O object is +/// readable again. +/// +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `read` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +pub trait AsyncRead: std_io::Read { + /// Prepares an uninitialized buffer to be safe to pass to `read`. Returns + /// `true` if the supplied buffer was zeroed out. + /// + /// While it would be highly unusual, implementations of [`io::Read`] are + /// able to read data from the buffer passed as an argument. Because of + /// this, the buffer passed to [`io::Read`] must be initialized memory. In + /// situations where large numbers of buffers are used, constantly having to + /// zero out buffers can be expensive. + /// + /// This function does any necessary work to prepare an uninitialized buffer + /// to be safe to pass to `read`. If `read` guarantees to never attempt to + /// read data out of the supplied buffer, then `prepare_uninitialized_buffer` + /// doesn't need to do any work. + /// + /// If this function returns `true`, then the memory has been zeroed out. + /// This allows implementations of `AsyncRead` which are composed of + /// multiple subimplementations to efficiently implement + /// `prepare_uninitialized_buffer`. + /// + /// This function isn't actually `unsafe` to call but `unsafe` to implement. + /// The implementer must ensure that either the whole `buf` has been zeroed + /// or `read_buf()` overwrites the buffer without reading it and returns + /// correct value. + /// + /// This function is called from [`read_buf`]. + /// + /// [`io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html + /// [`read_buf`]: #method.read_buf + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + for i in 0..buf.len() { + buf[i] = 0; + } + + true + } + + /// Attempt to read from the `AsyncRead` into `buf`. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, std_io::Error> { + match self.read(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Pull some bytes from this source into the specified `BufMut`, returning + /// how many bytes were read. + /// + /// The `buf` provided will have bytes read into it and the internal cursor + /// will be advanced if any bytes were read. Note that this method typically + /// will not reallocate the buffer provided. + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where + Self: Sized, + { + if !buf.has_remaining_mut() { + return Ok(Async::Ready(0)); + } + + unsafe { + let n = { + let b = buf.bytes_mut(); + + self.prepare_uninitialized_buffer(b); + + try_ready!(self.poll_read(b)) + }; + + buf.advance_mut(n); + Ok(Async::Ready(n)) + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// I/O object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + #[deprecated(since = "0.1.7", note = "Use tokio_codec::Decoder::framed instead")] + #[allow(deprecated)] + fn framed<T: Encoder + Decoder>(self, codec: T) -> Framed<Self, T> + where + Self: AsyncWrite + Sized, + { + framed::framed(self, codec) + } + + /// Helper method for splitting this read/write object into two halves. + /// + /// The two halves returned implement the `Read` and `Write` traits, + /// respectively. + /// + /// To restore this read/write object from its `ReadHalf` and `WriteHalf` + /// use `unsplit`. + fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>) + where + Self: AsyncWrite + Sized, + { + split::split(self) + } +} + +impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + (**self).prepare_uninitialized_buffer(buf) + } +} + +impl<'a> AsyncRead for &'a [u8] { + unsafe fn prepare_uninitialized_buffer(&self, _buf: &mut [u8]) -> bool { + false + } +} diff --git a/third_party/rust/tokio-io/src/async_write.rs b/third_party/rust/tokio-io/src/async_write.rs new file mode 100644 index 0000000000..0a09480e82 --- /dev/null +++ b/third_party/rust/tokio-io/src/async_write.rs @@ -0,0 +1,222 @@ +use bytes::Buf; +use futures::{Async, Poll}; +use std::io as std_io; + +use AsyncRead; + +/// Writes bytes asynchronously. +/// +/// The trait inherits from `std::io::Write` and indicates that an I/O object is +/// **nonblocking**. All non-blocking I/O objects must return an error when +/// bytes cannot be written instead of blocking the current thread. +/// +/// Specifically, this means that the `poll_write` function will return one of +/// the following: +/// +/// * `Ok(Async::Ready(n))` means that `n` bytes of data was immediately +/// written. +/// +/// * `Ok(Async::NotReady)` means that no data was written from the buffer +/// provided. The I/O object is not currently writable but may become writable +/// in the future. Most importantly, **the current future's task is scheduled +/// to get unparked when the object is writable**. This means that like +/// `Future::poll` you'll receive a notification when the I/O object is +/// writable again. +/// +/// * `Err(e)` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `write` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +/// +/// Note that this trait also represents that the `Write::flush` method works +/// very similarly to the `write` method, notably that `Ok(())` means that the +/// writer has successfully been flushed, a "would block" error means that the +/// current task is ready to receive a notification when flushing can make more +/// progress, and otherwise normal errors can happen as well. +pub trait AsyncWrite: std_io::Write { + /// Attempt to write bytes from `buf` into the object. + /// + /// On success, returns `Ok(Async::Ready(num_bytes_written))`. + /// + /// If the object is not ready for writing, the method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object becomes + /// readable or is closed. + fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, std_io::Error> { + match self.write(buf) { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Attempt to flush the object, ensuring that any buffered data reach + /// their destination. + /// + /// On success, returns `Ok(Async::Ready(()))`. + /// + /// If flushing cannot immediately complete, this method returns + /// `Ok(Async::NotReady)` and arranges for the current task (via + /// `cx.waker()`) to receive a notification when the object can make + /// progress towards flushing. + fn poll_flush(&mut self) -> Poll<(), std_io::Error> { + match self.flush() { + Ok(t) => Ok(Async::Ready(t)), + Err(ref e) if e.kind() == std_io::ErrorKind::WouldBlock => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + /// Initiates or attempts to shut down this writer, returning success when + /// the I/O connection has completely shut down. + /// + /// This method is intended to be used for asynchronous shutdown of I/O + /// connections. For example this is suitable for implementing shutdown of a + /// TLS connection or calling `TcpStream::shutdown` on a proxied connection. + /// Protocols sometimes need to flush out final pieces of data or otherwise + /// perform a graceful shutdown handshake, reading/writing more data as + /// appropriate. This method is the hook for such protocols to implement the + /// graceful shutdown logic. + /// + /// This `shutdown` method is required by implementers of the + /// `AsyncWrite` trait. Wrappers typically just want to proxy this call + /// through to the wrapped type, and base types will typically implement + /// shutdown logic here or just return `Ok(().into())`. Note that if you're + /// wrapping an underlying `AsyncWrite` a call to `shutdown` implies that + /// transitively the entire stream has been shut down. After your wrapper's + /// shutdown logic has been executed you should shut down the underlying + /// stream. + /// + /// Invocation of a `shutdown` implies an invocation of `flush`. Once this + /// method returns `Ready` it implies that a flush successfully happened + /// before the shutdown happened. That is, callers don't need to call + /// `flush` before calling `shutdown`. They can rely that by calling + /// `shutdown` any pending buffered data will be written out. + /// + /// # Return value + /// + /// This function returns a `Poll<(), io::Error>` classified as such: + /// + /// * `Ok(Async::Ready(()))` - indicates that the connection was + /// successfully shut down and is now safe to deallocate/drop/close + /// resources associated with it. This method means that the current task + /// will no longer receive any notifications due to this method and the + /// I/O object itself is likely no longer usable. + /// + /// * `Ok(Async::NotReady)` - indicates that shutdown is initiated but could + /// not complete just yet. This may mean that more I/O needs to happen to + /// continue this shutdown operation. The current task is scheduled to + /// receive a notification when it's otherwise ready to continue the + /// shutdown operation. When woken up this method should be called again. + /// + /// * `Err(e)` - indicates a fatal error has happened with shutdown, + /// indicating that the shutdown operation did not complete successfully. + /// This typically means that the I/O object is no longer usable. + /// + /// # Errors + /// + /// This function can return normal I/O errors through `Err`, described + /// above. Additionally this method may also render the underlying + /// `Write::write` method no longer usable (e.g. will return errors in the + /// future). It's recommended that once `shutdown` is called the + /// `write` method is no longer called. + /// + /// # Panics + /// + /// This function will panic if not called within the context of a future's + /// task. + fn shutdown(&mut self) -> Poll<(), std_io::Error>; + + /// Write a `Buf` into this value, returning how many bytes were written. + /// + /// Note that this method will advance the `buf` provided automatically by + /// the number of bytes written. + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, std_io::Error> + where + Self: Sized, + { + if !buf.has_remaining() { + return Ok(Async::Ready(0)); + } + + let n = try_ready!(self.poll_write(buf.bytes())); + buf.advance(n); + Ok(Async::Ready(n)) + } +} + +impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} +impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + (**self).shutdown() + } +} + +impl AsyncRead for std_io::Repeat { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } +} + +impl AsyncWrite for std_io::Sink { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl<T: AsyncRead> AsyncRead for std_io::Take<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T, U> AsyncRead for std_io::Chain<T, U> +where + T: AsyncRead, + U: AsyncRead, +{ + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + let (t, u) = self.get_ref(); + // We don't need to execute the second initializer if the first one + // already zeroed the buffer out. + t.prepare_uninitialized_buffer(buf) || u.prepare_uninitialized_buffer(buf) + } +} + +impl<T: AsyncWrite> AsyncWrite for std_io::BufWriter<T> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + try_ready!(self.poll_flush()); + self.get_mut().shutdown() + } +} + +impl<T: AsyncRead> AsyncRead for std_io::BufReader<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T: AsRef<[u8]>> AsyncRead for std_io::Cursor<T> {} + +impl<'a> AsyncWrite for std_io::Cursor<&'a mut [u8]> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Vec<u8>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} + +impl AsyncWrite for std_io::Cursor<Box<[u8]>> { + fn shutdown(&mut self) -> Poll<(), std_io::Error> { + Ok(().into()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/bytes_codec.rs b/third_party/rust/tokio-io/src/codec/bytes_codec.rs new file mode 100644 index 0000000000..ecfd15ab99 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/bytes_codec.rs @@ -0,0 +1,42 @@ +#![allow(deprecated)] + +use bytes::{BufMut, Bytes, BytesMut}; +use codec::{Decoder, Encoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { + BytesCodec(()) + } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/decoder.rs b/third_party/rust/tokio-io/src/codec/decoder.rs new file mode 100644 index 0000000000..cd5777dea8 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/decoder.rs @@ -0,0 +1,117 @@ +use bytes::BytesMut; +use std::io; + +use super::encoder::Encoder; +use {AsyncRead, AsyncWrite}; + +use _tokio_codec::Framed; + +/// Decoding of frames via buffers. +/// +/// This trait is used when constructing an instance of `Framed` or +/// `FramedRead`. An implementation of `Decoder` takes a byte stream that has +/// already been buffered in `src` and decodes the data into a stream of +/// `Self::Item` frames. +/// +/// Implementations are able to track state on `self`, which enables +/// implementing stateful streaming parsers. In many cases, though, this type +/// will simply be a unit struct (e.g. `struct HttpDecoder`). + +// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and +// there doesn't seem to be a way to un-deprecate the re-export. +pub trait Decoder { + /// The type of decoded frames. + type Item; + + /// The type of unrecoverable frame decoding errors. + /// + /// If an individual message is ill-formed but can be ignored without + /// interfering with the processing of future messages, it may be more + /// useful to report the failure as an `Item`. + /// + /// `From<io::Error>` is required in the interest of making `Error` suitable + /// for returning directly from a `FramedRead`, and to enable the default + /// implementation of `decode_eof` to yield an `io::Error` when the decoder + /// fails to consume all available data. + /// + /// Note that implementors of this trait can simply indicate `type Error = + /// io::Error` to use I/O errors as this type. + type Error: From<io::Error>; + + /// Attempts to decode a frame from the provided buffer of bytes. + /// + /// This method is called by `FramedRead` whenever bytes are ready to be + /// parsed. The provided buffer of bytes is what's been read so far, and + /// this instance of `Decode` can determine whether an entire frame is in + /// the buffer and is ready to be returned. + /// + /// If an entire frame is available, then this instance will remove those + /// bytes from the buffer provided and return them as a decoded + /// frame. Note that removing bytes from the provided buffer doesn't always + /// necessarily copy the bytes, so this should be an efficient operation in + /// most circumstances. + /// + /// If the bytes look valid, but a frame isn't fully available yet, then + /// `Ok(None)` is returned. This indicates to the `Framed` instance that + /// it needs to read some more bytes before calling this method again. + /// + /// Note that the bytes provided may be empty. If a previous call to + /// `decode` consumed all the bytes in the buffer then `decode` will be + /// called again until it returns `Ok(None)`, indicating that more bytes need to + /// be read. + /// + /// Finally, if the bytes in the buffer are malformed then an error is + /// returned indicating why. This informs `Framed` that the stream is now + /// corrupt and should be terminated. + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>; + + /// A default method available to be called when there are no more bytes + /// available to be read from the underlying I/O. + /// + /// This method defaults to calling `decode` and returns an error if + /// `Ok(None)` is returned while there is unconsumed data in `buf`. + /// Typically this doesn't need to be implemented unless the framing + /// protocol differs near the end of the stream. + /// + /// Note that the `buf` argument may be empty. If a previous call to + /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be + /// called again until it returns `None`, indicating that there are no more + /// frames to yield. This behavior enables returning finalization frames + /// that may not be based on inbound data. + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + match self.decode(buf)? { + Some(frame) => Ok(Some(frame)), + None => { + if buf.is_empty() { + Ok(None) + } else { + Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into()) + } + } + } + } + + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self> + where + Self: Encoder + Sized, + { + Framed::new(io, self) + } +} diff --git a/third_party/rust/tokio-io/src/codec/encoder.rs b/third_party/rust/tokio-io/src/codec/encoder.rs new file mode 100644 index 0000000000..5065080324 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/encoder.rs @@ -0,0 +1,25 @@ +use bytes::BytesMut; +use std::io; + +/// Trait of helper objects to write out messages as bytes, for use with +/// `FramedWrite`. + +// Note: We can't deprecate this trait, because the deprecation carries through to tokio-codec, and +// there doesn't seem to be a way to un-deprecate the re-export. +pub trait Encoder { + /// The type of items consumed by the `Encoder` + type Item; + + /// The type of encoding errors. + /// + /// `FramedWrite` requires `Encoder`s errors to implement `From<io::Error>` + /// in the interest letting it return `Error`s directly. + type Error: From<io::Error>; + + /// Encodes a frame into the buffer provided. + /// + /// This method will encode `item` into the byte buffer provided by `dst`. + /// The `dst` provided is an internal buffer of the `Framed` instance and + /// will be written out when possible. + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error>; +} diff --git a/third_party/rust/tokio-io/src/codec/lines_codec.rs b/third_party/rust/tokio-io/src/codec/lines_codec.rs new file mode 100644 index 0000000000..818397fa50 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/lines_codec.rs @@ -0,0 +1,88 @@ +#![allow(deprecated)] + +use bytes::{BufMut, BytesMut}; +use codec::{Decoder, Encoder}; +use std::{io, str}; + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + if let Some(newline_offset) = buf[self.next_index..].iter().position(|b| *b == b'\n') { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len() - 1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} diff --git a/third_party/rust/tokio-io/src/codec/mod.rs b/third_party/rust/tokio-io/src/codec/mod.rs new file mode 100644 index 0000000000..6636821750 --- /dev/null +++ b/third_party/rust/tokio-io/src/codec/mod.rs @@ -0,0 +1,378 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +// tokio_io::codec originally held all codec-related helpers. This is now intended to be in +// tokio_codec instead. However, for backward compatibility, this remains here. When the next major +// breaking change comes, `Encoder` and `Decoder` need to be moved to `tokio_codec`, and the rest +// of this module should be removed. + +#![doc(hidden)] +#![allow(deprecated)] + +mod bytes_codec; +mod decoder; +mod encoder; +mod lines_codec; + +pub use self::bytes_codec::BytesCodec; +pub use self::decoder::Decoder; +pub use self::encoder::Encoder; +pub use self::lines_codec::LinesCodec; + +pub use framed::{Framed, FramedParts}; +pub use framed_read::FramedRead; +pub use framed_write::FramedWrite; + +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub mod length_delimited { + //! Frame a stream of bytes based on a length prefix + //! + //! Many protocols delimit their frames by prefacing frame data with a + //! frame head that specifies the length of the frame. The + //! `length_delimited` module provides utilities for handling the length + //! based framing. This allows the consumer to work with entire frames + //! without having to worry about buffering or other framing logic. + //! + //! # Getting started + //! + //! If implementing a protocol from scratch, using length delimited framing + //! is an easy way to get started. [`Framed::new()`](length_delimited::Framed::new) will adapt a + //! full-duplex byte stream with a length delimited framer using default + //! configuration values. + //! + //! ``` + //! use tokio_io::{AsyncRead, AsyncWrite}; + //! use tokio_io::codec::length_delimited; + //! + //! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T) + //! -> length_delimited::Framed<T> + //! { + //! length_delimited::Framed::new(io) + //! } + //! ``` + //! + //! The returned transport implements `Sink + Stream` for `BytesMut`. It + //! encodes the frame with a big-endian `u32` header denoting the frame + //! payload length: + //! + //! ```text + //! +----------+--------------------------------+ + //! | len: u32 | frame payload | + //! +----------+--------------------------------+ + //! ``` + //! + //! Specifically, given the following: + //! + //! ``` + //! # extern crate tokio_io; + //! # extern crate bytes; + //! # extern crate futures; + //! # + //! use tokio_io::{AsyncRead, AsyncWrite}; + //! use tokio_io::codec::length_delimited; + //! use bytes::BytesMut; + //! use futures::{Sink, Future}; + //! + //! fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + //! let mut transport = length_delimited::Framed::new(io); + //! let frame = BytesMut::from("hello world"); + //! + //! transport.send(frame).wait().unwrap(); + //! } + //! # + //! # pub fn main() {} + //! ``` + //! + //! The encoded frame will look like this: + //! + //! ```text + //! +---- len: u32 ----+---- data ----+ + //! | \x00\x00\x00\x0b | hello world | + //! +------------------+--------------+ + //! ``` + //! + //! # Decoding + //! + //! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`], + //! such that each yielded [`BytesMut`] value contains the contents of an + //! entire frame. There are many configuration parameters enabling + //! [`FramedRead`] to handle a wide range of protocols. Here are some + //! examples that will cover the various options at a high level. + //! + //! ## Example 1 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! .num_skip(0) // Do not strip frame header + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! The value of the length field is 11 (`\x0B`) which represents the length + //! of the payload, `hello world`. By default, [`FramedRead`] assumes that + //! the length field represents the number of bytes that **follows** the + //! length field. Thus, the entire frame has a length of 13: 2 bytes for the + //! frame head + 11 bytes for the payload. + //! + //! ## Example 2 + //! + //! The following will parse a `u16` length field at offset 0, omitting the + //! frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(0) // default value + //! // `num_skip` is not needed, the default is to skip + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +--- Payload ---+ + //! | \x00\x0B | Hello world | --> | Hello world | + //! +----------+---------------+ +---------------+ + //! ``` + //! + //! This is similar to the first example, the only difference is that the + //! frame head is **not** included in the yielded `BytesMut` value. + //! + //! ## Example 3 + //! + //! The following will parse a `u16` length field at offset 0, including the + //! frame head in the yielded `BytesMut`. In this case, the length field + //! **includes** the frame head length. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(2) + //! .length_adjustment(-2) // size of head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT DECODED + //! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+ + //! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world | + //! +----------+---------------+ +----------+---------------+ + //! ``` + //! + //! In most cases, the length field represents the length of the payload + //! only, as shown in the previous examples. However, in some protocols the + //! length field represents the length of the whole frame, including the + //! head. In such cases, we specify a negative `length_adjustment` to adjust + //! the value provided in the frame head to represent the payload length. + //! + //! ## Example 4 + //! + //! The following will parse a 3 byte length field at offset 0 in a 5 byte + //! frame head, including the frame head in the yielded `BytesMut`. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(0) // default value + //! .length_field_length(3) + //! .length_adjustment(2) // remaining head + //! .num_skip(0) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! + //! DECODED + //! +---- len -----+- head -+--- Payload ---+ + //! | \x00\x00\x0B | \xCAFE | Hello world | + //! +--------------+--------+---------------+ + //! ``` + //! + //! A more advanced example that shows a case where there is extra frame + //! head data between the length field and the payload. In such cases, it is + //! usually desirable to include the frame head as part of the yielded + //! `BytesMut`. This lets consumers of the length delimited framer to + //! process the frame head as needed. + //! + //! The positive `length_adjustment` value lets `FramedRead` factor in the + //! additional head into the frame length calculation. + //! + //! ## Example 5 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(1) // length of hdr2 + //! .num_skip(3) // length of hdr1 + LEN + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0B | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! The length field is situated in the middle of the frame head. In this + //! case, the first byte in the frame head could be a version or some other + //! identifier that is not needed for processing. On the other hand, the + //! second half of the head is needed. + //! + //! `length_field_offset` indicates how many bytes to skip before starting + //! to read the length field. `length_adjustment` is the number of bytes to + //! skip starting at the end of the length field. In this case, it is the + //! second half of the head. + //! + //! ## Example 6 + //! + //! The following will parse a `u16` length field at offset 1 of a 4 byte + //! frame head. The first byte and the length field will be omitted from the + //! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be + //! included. In this case, the length field **includes** the frame head + //! length. + //! + //! ``` + //! # use tokio_io::AsyncRead; + //! # use tokio_io::codec::length_delimited; + //! # fn bind_read<T: AsyncRead>(io: T) { + //! length_delimited::Builder::new() + //! .length_field_offset(1) // length of hdr1 + //! .length_field_length(2) + //! .length_adjustment(-3) // length of hdr1 + LEN, negative + //! .num_skip(3) + //! .new_read(io); + //! # } + //! ``` + //! + //! The following frame will be decoded as such: + //! + //! ```text + //! INPUT + //! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+ + //! | \xCA | \x00\x0F | \xFE | Hello world | + //! +--------+----------+--------+---------------+ + //! + //! DECODED + //! +- hdr2 -+--- Payload ---+ + //! | \xFE | Hello world | + //! +--------+---------------+ + //! ``` + //! + //! Similar to the example above, the difference is that the length field + //! represents the length of the entire frame instead of just the payload. + //! The length of `hdr1` and `len` must be counted in `length_adjustment`. + //! Note that the length of `hdr2` does **not** need to be explicitly set + //! anywhere because it already is factored into the total frame length that + //! is read from the byte stream. + //! + //! # Encoding + //! + //! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`], + //! such that each submitted [`BytesMut`] is prefaced by a length field. + //! There are fewer configuration options than [`FramedRead`]. Given + //! protocols that have more complex frame heads, an encoder should probably + //! be written by hand using [`Encoder`]. + //! + //! Here is a simple example, given a `FramedWrite` with the following + //! configuration: + //! + //! ``` + //! # extern crate tokio_io; + //! # extern crate bytes; + //! # use tokio_io::AsyncWrite; + //! # use tokio_io::codec::length_delimited; + //! # use bytes::BytesMut; + //! # fn write_frame<T: AsyncWrite>(io: T) { + //! # let _: length_delimited::FramedWrite<T, BytesMut> = + //! length_delimited::Builder::new() + //! .length_field_length(2) + //! .new_write(io); + //! # } + //! # pub fn main() {} + //! ``` + //! + //! A payload of `hello world` will be encoded as: + //! + //! ```text + //! +- len: u16 -+---- data ----+ + //! | \x00\x0b | hello world | + //! +------------+--------------+ + //! ``` + //! + //! [`FramedRead`]: struct.FramedRead.html + //! [`FramedWrite`]: struct.FramedWrite.html + //! [`AsyncRead`]: ../../trait.AsyncRead.html + //! [`AsyncWrite`]: ../../trait.AsyncWrite.html + //! [`Encoder`]: ../trait.Encoder.html + //! [`BytesMut`]: https://docs.rs/bytes/0.4/bytes/struct.BytesMut.html + + pub use length_delimited::*; +} diff --git a/third_party/rust/tokio-io/src/framed.rs b/third_party/rust/tokio-io/src/framed.rs new file mode 100644 index 0000000000..aea7b5468c --- /dev/null +++ b/third_party/rust/tokio-io/src/framed.rs @@ -0,0 +1,248 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read, Write}; + +use codec::{Decoder, Encoder}; +use framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; +use framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Poll, Sink, StartSend, Stream}; + +/// A unified `Stream` and `Sink` interface to an underlying I/O object, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Framed<T, U> { + inner: FramedRead2<FramedWrite2<Fuse<T, U>>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Fuse<T, U>(pub T, pub U); + +pub fn framed<T, U>(inner: T, codec: U) -> Framed<T, U> +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, +{ + Framed { + inner: framed_read2(framed_write2(Fuse(inner, codec))), + } +} + +impl<T, U> Framed<T, U> { + /// Provides a `Stream` and `Sink` interface for reading and writing to this + /// `Io` object, using `Decode` and `Encode` to read and write the raw data. + /// + /// Raw I/O objects work with byte sequences, but higher-level code usually + /// wants to batch these into meaningful chunks, called "frames". This + /// method layers framing on top of an I/O object, by using the `Codec` + /// traits to handle encoding and decoding of messages frames. Note that + /// the incoming and outgoing frame types may be distinct. + /// + /// This function returns a *single* object that is both `Stream` and + /// `Sink`; grouping this into a single object is often useful for layering + /// things like gzip or TLS, which require both read and write access to the + /// underlying object. + /// + /// This objects takes a stream and a readbuffer and a writebuffer. These field + /// can be obtained from an existing `Framed` with the `into_parts` method. + /// + /// If you want to work more directly with the streams and sink, consider + /// calling `split` on the `Framed` returned by this method, which will + /// break them into separate objects, allowing them to interact more easily. + pub fn from_parts(parts: FramedParts<T>, codec: U) -> Framed<T, U> { + Framed { + inner: framed_read2_with_buffer( + framed_write2_with_buffer(Fuse(parts.inner, codec), parts.writebuf), + parts.readbuf, + ), + } + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.get_ref().get_ref().0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Frame`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.get_mut().get_mut().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner().0 + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_parts(self) -> FramedParts<T> { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + FramedParts { + inner: inner.0, + readbuf: readbuf, + writebuf: writebuf, + } + } + + /// Consumes the `Frame`, returning its underlying I/O stream and the buffer + /// with unprocessed data, and also the current codec state. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + /// + /// Note that this function will be removed once the codec has been + /// integrated into `FramedParts` in a new version (see + /// [#53](https://github.com/tokio-rs/tokio-io/pull/53)). + pub fn into_parts_and_codec(self) -> (FramedParts<T>, U) { + let (inner, readbuf) = self.inner.into_parts(); + let (inner, writebuf) = inner.into_parts(); + ( + FramedParts { + inner: inner.0, + readbuf: readbuf, + writebuf: writebuf, + }, + inner.1, + ) + } +} + +impl<T, U> Stream for Framed<T, U> +where + T: AsyncRead, + U: Decoder, +{ + type Item = U::Item; + type Error = U::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, U> Sink for Framed<T, U> +where + T: AsyncWrite, + U: Encoder, + U::Error: From<io::Error>, +{ + type SinkItem = U::Item; + type SinkError = U::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.get_mut().start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.get_mut().close() + } +} + +impl<T, U> fmt::Debug for Framed<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("io", &self.inner.get_ref().get_ref().0) + .field("codec", &self.inner.get_ref().get_ref().1) + .finish() + } +} + +// ===== impl Fuse ===== + +impl<T: Read, U> Read for Fuse<T, U> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.0.read(dst) + } +} + +impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.0.prepare_uninitialized_buffer(buf) + } +} + +impl<T: Write, U> Write for Fuse<T, U> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.0.write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.0.shutdown() + } +} + +impl<T, U: Decoder> Decoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode(buffer) + } + + fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + self.1.decode_eof(buffer) + } +} + +impl<T, U: Encoder> Encoder for Fuse<T, U> { + type Item = U::Item; + type Error = U::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.1.encode(item, dst) + } +} + +/// `FramedParts` contains an export of the data of a Framed transport. +/// It can be used to construct a new `Framed` with a different codec. +/// It contains all current buffers and the inner transport. +#[derive(Debug)] +pub struct FramedParts<T> { + /// The inner transport used to read bytes to and write bytes to + pub inner: T, + /// The buffer with read but unprocessed data. + pub readbuf: BytesMut, + /// A buffer with unprocessed data which are not written yet. + pub writebuf: BytesMut, +} diff --git a/third_party/rust/tokio-io/src/framed_read.rs b/third_party/rust/tokio-io/src/framed_read.rs new file mode 100644 index 0000000000..d675084b01 --- /dev/null +++ b/third_party/rust/tokio-io/src/framed_read.rs @@ -0,0 +1,220 @@ +#![allow(deprecated)] + +use std::fmt; + +use codec::Decoder; +use framed::Fuse; +use AsyncRead; + +use bytes::BytesMut; +use futures::{Async, Poll, Sink, StartSend, Stream}; + +/// A `Stream` of messages decoded from an `AsyncRead`. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedRead<T, D> { + inner: FramedRead2<Fuse<T, D>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedRead2<T> { + inner: T, + eof: bool, + is_readable: bool, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; + +// ===== impl FramedRead ===== + +impl<T, D> FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + /// Creates a new `FramedRead` with the given `decoder`. + pub fn new(inner: T, decoder: D) -> FramedRead<T, D> { + FramedRead { + inner: framed_read2(Fuse(inner, decoder)), + } + } +} + +impl<T, D> FramedRead<T, D> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn decoder(&self) -> &D { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn decoder_mut(&mut self) -> &mut D { + &mut self.inner.inner.1 + } +} + +impl<T, D> Stream for FramedRead<T, D> +where + T: AsyncRead, + D: Decoder, +{ + type Item = D::Item; + type Error = D::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.poll() + } +} + +impl<T, D> Sink for FramedRead<T, D> +where + T: Sink, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + self.inner.inner.0.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.inner.inner.0.close() + } +} + +impl<T, D> fmt::Debug for FramedRead<T, D> +where + T: fmt::Debug, + D: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedRead") + .field("inner", &self.inner.inner.0) + .field("decoder", &self.inner.inner.1) + .field("eof", &self.inner.eof) + .field("is_readable", &self.inner.is_readable) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedRead2 ===== + +pub fn framed_read2<T>(inner: T) -> FramedRead2<T> { + FramedRead2 { + inner: inner, + eof: false, + is_readable: false, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedRead2 { + inner: inner, + eof: false, + is_readable: buf.len() > 0, + buffer: buf, + } +} + +impl<T> FramedRead2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Stream for FramedRead2<T> +where + T: AsyncRead + Decoder, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + // Repeatedly call `decode` or `decode_eof` as long as it is + // "readable". Readable is defined as not having returned `None`. If + // the upstream has returned EOF, and the decoder is no longer + // readable, it can be assumed that the decoder will never become + // readable again, at which point the stream is terminated. + if self.is_readable { + if self.eof { + let frame = self.inner.decode_eof(&mut self.buffer)?; + return Ok(Async::Ready(frame)); + } + + trace!("attempting to decode a frame"); + + if let Some(frame) = self.inner.decode(&mut self.buffer)? { + trace!("frame decoded from buffer"); + return Ok(Async::Ready(Some(frame))); + } + + self.is_readable = false; + } + + assert!(!self.eof); + + // Otherwise, try to read more data and try again. Make sure we've + // got room for at least one byte to read to ensure that we don't + // get a spurious 0 that looks like EOF + self.buffer.reserve(1); + if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { + self.eof = true; + } + + self.is_readable = true; + } + } +} diff --git a/third_party/rust/tokio-io/src/framed_write.rs b/third_party/rust/tokio-io/src/framed_write.rs new file mode 100644 index 0000000000..483b9fac8d --- /dev/null +++ b/third_party/rust/tokio-io/src/framed_write.rs @@ -0,0 +1,250 @@ +#![allow(deprecated)] + +use std::fmt; +use std::io::{self, Read}; + +use codec::{Decoder, Encoder}; +use framed::Fuse; +use {AsyncRead, AsyncWrite}; + +use bytes::BytesMut; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +/// A `Sink` of frames encoded to an `AsyncWrite`. +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite<T, E> { + inner: FramedWrite2<Fuse<T, E>>, +} + +#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite2<T> { + inner: T, + buffer: BytesMut, +} + +const INITIAL_CAPACITY: usize = 8 * 1024; +const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; + +impl<T, E> FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + /// Creates a new `FramedWrite` with the given `encoder`. + pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> { + FramedWrite { + inner: framed_write2(Fuse(inner, encoder)), + } + } +} + +impl<T, E> FramedWrite<T, E> { + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner.inner.0 + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner.inner.0 + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn into_inner(self) -> T { + self.inner.inner.0 + } + + /// Returns a reference to the underlying decoder. + pub fn encoder(&self) -> &E { + &self.inner.inner.1 + } + + /// Returns a mutable reference to the underlying decoder. + pub fn encoder_mut(&mut self) -> &mut E { + &mut self.inner.inner.1 + } +} + +impl<T, E> Sink for FramedWrite<T, E> +where + T: AsyncWrite, + E: Encoder, +{ + type SinkItem = E::Item; + type SinkError = E::Error; + + fn start_send(&mut self, item: E::Item) -> StartSend<E::Item, E::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + Ok(self.inner.close()?) + } +} + +impl<T, D> Stream for FramedWrite<T, D> +where + T: Stream, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.inner.inner.0.poll() + } +} + +impl<T, U> fmt::Debug for FramedWrite<T, U> +where + T: fmt::Debug, + U: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner.get_ref().0) + .field("encoder", &self.inner.get_ref().1) + .field("buffer", &self.inner.buffer) + .finish() + } +} + +// ===== impl FramedWrite2 ===== + +pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> { + FramedWrite2 { + inner: inner, + buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + } +} + +pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> { + if buf.capacity() < INITIAL_CAPACITY { + let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity(); + buf.reserve(bytes_to_reserve); + } + FramedWrite2 { + inner: inner, + buffer: buf, + } +} + +impl<T> FramedWrite2<T> { + pub fn get_ref(&self) -> &T { + &self.inner + } + + pub fn into_inner(self) -> T { + self.inner + } + + pub fn into_parts(self) -> (T, BytesMut) { + (self.inner, self.buffer) + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl<T> Sink for FramedWrite2<T> +where + T: AsyncWrite + Encoder, +{ + type SinkItem = T::Item; + type SinkError = T::Error; + + fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> { + // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's + // *still* over 8KiB, then apply backpressure (reject the send). + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + self.poll_complete()?; + + if self.buffer.len() >= BACKPRESSURE_BOUNDARY { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.inner.encode(item, &mut self.buffer)?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + trace!("flushing framed transport"); + + while !self.buffer.is_empty() { + trace!("writing; remaining={}", self.buffer.len()); + + let n = try_ready!(self.inner.poll_write(&self.buffer)); + + if n == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to + write frame to transport", + ) + .into()); + } + + // TODO: Add a way to `bytes` to do this w/o returning the drained + // data. + let _ = self.buffer.split_to(n); + } + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + trace!("framed transport flushed"); + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_complete()); + Ok(self.inner.shutdown()?) + } +} + +impl<T: Decoder> Decoder for FramedWrite2<T> { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode(src) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<T::Item>, T::Error> { + self.inner.decode_eof(src) + } +} + +impl<T: Read> Read for FramedWrite2<T> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.inner.read(dst) + } +} + +impl<T: AsyncRead> AsyncRead for FramedWrite2<T> { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } +} diff --git a/third_party/rust/tokio-io/src/io/copy.rs b/third_party/rust/tokio-io/src/io/copy.rs new file mode 100644 index 0000000000..e8a1dac95d --- /dev/null +++ b/third_party/rust/tokio-io/src/io/copy.rs @@ -0,0 +1,100 @@ +use std::io; + +use futures::{Future, Poll}; + +use {AsyncRead, AsyncWrite}; + +/// A future which will copy all data from a reader into a writer. +/// +/// Created by the [`copy`] function, this future will resolve to the number of +/// bytes copied or an error if one happens. +/// +/// [`copy`]: fn.copy.html +#[derive(Debug)] +pub struct Copy<R, W> { + reader: Option<R>, + read_done: bool, + writer: Option<W>, + pos: usize, + cap: usize, + amt: u64, + buf: Box<[u8]>, +} + +/// Creates a future which represents copying all the bytes from one object to +/// another. +/// +/// The returned future will copy all the bytes read from `reader` into the +/// `writer` specified. This future will only complete once the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned and the `reader` and `writer` are +/// consumed. On error the error is returned and the I/O objects are consumed as +/// well. +pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + Copy { + reader: Some(reader), + read_done: false, + writer: Some(writer), + amt: 0, + pos: 0, + cap: 0, + buf: Box::new([0; 2048]), + } +} + +impl<R, W> Future for Copy<R, W> +where + R: AsyncRead, + W: AsyncWrite, +{ + type Item = (u64, R, W); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(u64, R, W), io::Error> { + loop { + // If our buffer is empty, then we need to read some data to + // continue. + if self.pos == self.cap && !self.read_done { + let reader = self.reader.as_mut().unwrap(); + let n = try_ready!(reader.poll_read(&mut self.buf)); + if n == 0 { + self.read_done = true; + } else { + self.pos = 0; + self.cap = n; + } + } + + // If our buffer has some data, let's write it out! + while self.pos < self.cap { + let writer = self.writer.as_mut().unwrap(); + let i = try_ready!(writer.poll_write(&self.buf[self.pos..self.cap])); + if i == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "write zero byte into writer", + )); + } else { + self.pos += i; + self.amt += i as u64; + } + } + + // If we've written al the data and we've seen EOF, flush out the + // data and finish the transfer. + // done with the entire transfer. + if self.pos == self.cap && self.read_done { + try_ready!(self.writer.as_mut().unwrap().poll_flush()); + let reader = self.reader.take().unwrap(); + let writer = self.writer.take().unwrap(); + return Ok((self.amt, reader, writer).into()); + } + } + } +} diff --git a/third_party/rust/tokio-io/src/io/flush.rs b/third_party/rust/tokio-io/src/io/flush.rs new file mode 100644 index 0000000000..febc7ee1b1 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/flush.rs @@ -0,0 +1,43 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully flush an I/O object. +/// +/// Resolves to the underlying I/O object once the flush operation is complete. +/// +/// Created by the [`flush`] function. +/// +/// [`flush`]: fn.flush.html +#[derive(Debug)] +pub struct Flush<A> { + a: Option<A>, +} + +/// Creates a future which will entirely flush an I/O object and then yield the +/// object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling +/// a retry if `WouldBlock` is seen along the way. +pub fn flush<A>(a: A) -> Flush<A> +where + A: AsyncWrite, +{ + Flush { a: Some(a) } +} + +impl<A> Future for Flush<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().poll_flush()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/mod.rs b/third_party/rust/tokio-io/src/io/mod.rs new file mode 100644 index 0000000000..763cfaee7f --- /dev/null +++ b/third_party/rust/tokio-io/src/io/mod.rs @@ -0,0 +1,32 @@ +//! I/O conveniences when working with primitives in `tokio-core` +//! +//! Contains various combinators to work with I/O objects and type definitions +//! as well. +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +mod copy; +mod flush; +mod read; +mod read_exact; +mod read_to_end; +mod read_until; +mod shutdown; +mod write_all; + +pub use self::copy::{copy, Copy}; +pub use self::flush::{flush, Flush}; +pub use self::read::{read, Read}; +pub use self::read_exact::{read_exact, ReadExact}; +pub use self::read_to_end::{read_to_end, ReadToEnd}; +pub use self::read_until::{read_until, ReadUntil}; +pub use self::shutdown::{shutdown, Shutdown}; +pub use self::write_all::{write_all, WriteAll}; +pub use allow_std::AllowStdIo; +pub use lines::{lines, Lines}; +pub use split::{ReadHalf, WriteHalf}; +pub use window::Window; diff --git a/third_party/rust/tokio-io/src/io/read.rs b/third_party/rust/tokio-io/src/io/read.rs new file mode 100644 index 0000000000..632cef4d28 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read.rs @@ -0,0 +1,60 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +#[derive(Debug)] +enum State<R, T> { + Pending { rd: R, buf: T }, + Empty, +} + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub fn read<R, T>(rd: R, buf: T) -> Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + Read { + state: State::Pending { rd: rd, buf: buf }, + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Read<R, T> { + state: State<R, T>, +} + +impl<R, T> Future for Read<R, T> +where + R: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (R, T, usize); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(R, T, usize), io::Error> { + let nread = match self.state { + State::Pending { + ref mut rd, + ref mut buf, + } => try_ready!(rd.poll_read(&mut buf.as_mut()[..])), + State::Empty => panic!("poll a Read after it's done"), + }; + + match mem::replace(&mut self.state, State::Empty) { + State::Pending { rd, buf } => Ok((rd, buf, nread).into()), + State::Empty => panic!("invalid internal state"), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_exact.rs b/third_party/rust/tokio-io/src/io/read_exact.rs new file mode 100644 index 0000000000..3b98621ae4 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_exact.rs @@ -0,0 +1,85 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read exactly enough bytes to fill +/// a buffer. +/// +/// Created by the [`read_exact`] function. +/// +/// [`read_exact`]: fn.read_exact.html +#[derive(Debug)] +pub struct ReadExact<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Reading { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future which will read exactly enough bytes to fill `buf`, +/// returning an error if EOF is hit sooner. +/// +/// The returned future will resolve to both the I/O stream as well as the +/// buffer once the read operation is completed. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all data read from the stream appended to +/// the buffer. +pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + ReadExact { + state: State::Reading { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + +impl<A, T> Future for ReadExact<A, T> +where + A: AsyncRead, + T: AsMut<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + ref mut pos, + } => { + let buf = buf.as_mut(); + while *pos < buf.len() { + let n = try_ready!(a.poll_read(&mut buf[*pos..])); + *pos += n; + if n == 0 { + return Err(eof()); + } + } + } + State::Empty => panic!("poll a ReadExact after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_to_end.rs b/third_party/rust/tokio-io/src/io/read_to_end.rs new file mode 100644 index 0000000000..296af6e304 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_to_end.rs @@ -0,0 +1,66 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the entire contents of a stream +/// into a vector. +/// +/// Created by the [`read_to_end`] function. +/// +/// [`read_to_end`]: fn.read_to_end.html +#[derive(Debug)] +pub struct ReadToEnd<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided. +/// +/// In the case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success both the object and the buffer +/// will be returned, with all data read from the stream appended to the buffer. +pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A> +where + A: AsyncRead, +{ + ReadToEnd { + state: State::Reading { a: a, buf: buf }, + } +} + +impl<A> Future for ReadToEnd<A> +where + A: AsyncRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + ref mut buf, + } => { + // If we get `Ok`, then we know the stream hit EOF and we're done. If we + // hit "would block" then all the read data so far is in our buffer, and + // otherwise we propagate errors + try_nb!(a.read_to_end(buf)); + } + State::Empty => panic!("poll ReadToEnd after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/read_until.rs b/third_party/rust/tokio-io/src/io/read_until.rs new file mode 100644 index 0000000000..d0be4c9448 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/read_until.rs @@ -0,0 +1,76 @@ +use std::io::{self, BufRead}; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncRead; + +/// A future which can be used to easily read the contents of a stream into a +/// vector until the delimiter is reached. +/// +/// Created by the [`read_until`] function. +/// +/// [`read_until`]: fn.read_until.html +#[derive(Debug)] +pub struct ReadUntil<A> { + state: State<A>, +} + +#[derive(Debug)] +enum State<A> { + Reading { a: A, byte: u8, buf: Vec<u8> }, + Empty, +} + +/// Creates a future which will read all the bytes associated with the I/O +/// object `A` into the buffer provided until the delimiter `byte` is reached. +/// This method is the async equivalent to [`BufRead::read_until`]. +/// +/// In case of an error the buffer and the object will be discarded, with +/// the error yielded. In the case of success the object will be destroyed and +/// the buffer will be returned, with all bytes up to, and including, the delimiter +/// (if found). +/// +/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until +pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + ReadUntil { + state: State::Reading { + a: a, + byte: byte, + buf: buf, + }, + } +} + +impl<A> Future for ReadUntil<A> +where + A: AsyncRead + BufRead, +{ + type Item = (A, Vec<u8>); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> { + match self.state { + State::Reading { + ref mut a, + byte, + ref mut buf, + } => { + // If we get `Ok(n)`, then we know the stream hit EOF or the delimiter. + // and just return it, as we are finished. + // If we hit "would block" then all the read data so far + // is in our buffer, and otherwise we propagate errors. + try_nb!(a.read_until(byte, buf)); + } + State::Empty => panic!("poll ReadUntil after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Reading { a, byte: _, buf } => Ok((a, buf).into()), + State::Empty => unreachable!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/io/shutdown.rs b/third_party/rust/tokio-io/src/io/shutdown.rs new file mode 100644 index 0000000000..d963a813ad --- /dev/null +++ b/third_party/rust/tokio-io/src/io/shutdown.rs @@ -0,0 +1,44 @@ +use std::io; + +use futures::{Async, Future, Poll}; + +use AsyncWrite; + +/// A future used to fully shutdown an I/O object. +/// +/// Resolves to the underlying I/O object once the shutdown operation is +/// complete. +/// +/// Created by the [`shutdown`] function. +/// +/// [`shutdown`]: fn.shutdown.html +#[derive(Debug)] +pub struct Shutdown<A> { + a: Option<A>, +} + +/// Creates a future which will entirely shutdown an I/O object and then yield +/// the object itself. +/// +/// This function will consume the object provided if an error happens, and +/// otherwise it will repeatedly call `shutdown` until it sees `Ok(())`, +/// scheduling a retry if `WouldBlock` is seen along the way. +pub fn shutdown<A>(a: A) -> Shutdown<A> +where + A: AsyncWrite, +{ + Shutdown { a: Some(a) } +} + +impl<A> Future for Shutdown<A> +where + A: AsyncWrite, +{ + type Item = A; + type Error = io::Error; + + fn poll(&mut self) -> Poll<A, io::Error> { + try_ready!(self.a.as_mut().unwrap().shutdown()); + Ok(Async::Ready(self.a.take().unwrap())) + } +} diff --git a/third_party/rust/tokio-io/src/io/write_all.rs b/third_party/rust/tokio-io/src/io/write_all.rs new file mode 100644 index 0000000000..ba8af4a222 --- /dev/null +++ b/third_party/rust/tokio-io/src/io/write_all.rs @@ -0,0 +1,88 @@ +use std::io; +use std::mem; + +use futures::{Future, Poll}; + +use AsyncWrite; + +/// A future used to write the entire contents of some data to a stream. +/// +/// This is created by the [`write_all`] top-level method. +/// +/// [`write_all`]: fn.write_all.html +#[derive(Debug)] +pub struct WriteAll<A, T> { + state: State<A, T>, +} + +#[derive(Debug)] +enum State<A, T> { + Writing { a: A, buf: T, pos: usize }, + Empty, +} + +/// Creates a future that will write the entire contents of the buffer `buf` to +/// the stream `a` provided. +/// +/// The returned future will not return until all the data has been written, and +/// the future will resolve to the stream as well as the buffer (for reuse if +/// needed). +/// +/// Any error which happens during writing will cause both the stream and the +/// buffer to get destroyed. +/// +/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should +/// be broadly applicable to accepting data which can be converted to a slice. +/// The `Window` struct is also available in this crate to provide a different +/// window into a slice if necessary. +pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + WriteAll { + state: State::Writing { + a: a, + buf: buf, + pos: 0, + }, + } +} + +fn zero_write() -> io::Error { + io::Error::new(io::ErrorKind::WriteZero, "zero-length write") +} + +impl<A, T> Future for WriteAll<A, T> +where + A: AsyncWrite, + T: AsRef<[u8]>, +{ + type Item = (A, T); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(A, T), io::Error> { + match self.state { + State::Writing { + ref mut a, + ref buf, + ref mut pos, + } => { + let buf = buf.as_ref(); + while *pos < buf.len() { + let n = try_ready!(a.poll_write(&buf[*pos..])); + *pos += n; + if n == 0 { + return Err(zero_write()); + } + } + } + State::Empty => panic!("poll a WriteAll after it's done"), + } + + match mem::replace(&mut self.state, State::Empty) { + State::Writing { a, buf, .. } => Ok((a, buf).into()), + State::Empty => panic!(), + } + } +} diff --git a/third_party/rust/tokio-io/src/length_delimited.rs b/third_party/rust/tokio-io/src/length_delimited.rs new file mode 100644 index 0000000000..c211c95356 --- /dev/null +++ b/third_party/rust/tokio-io/src/length_delimited.rs @@ -0,0 +1,943 @@ +#![allow(deprecated)] + +use {codec, AsyncRead, AsyncWrite}; + +use bytes::buf::Chain; +use bytes::{Buf, BufMut, BytesMut, IntoBuf}; + +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +use std::error::Error as StdError; +use std::io::{self, Cursor}; +use std::{cmp, fmt}; + +/// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values. +/// +/// `Builder` enables constructing configured length delimited framers. Note +/// that not all configuration settings apply to both encoding and decoding. See +/// the documentation for specific methods for more detail. +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + max_frame_len: usize, + + // Number of bytes representing the field length + length_field_len: usize, + + // Number of bytes in the header before the length field + length_field_offset: usize, + + // Adjust the length specified in the header field by this amount + length_adjustment: isize, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: Option<usize>, + + // Length field byte order (little or big endian) + length_field_is_big_endian: bool, +} + +/// Adapts a byte stream into a unified `Stream` and `Sink` that works over +/// entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct Framed<T, B: IntoBuf = BytesMut> { + inner: FramedRead<FramedWrite<T, B>>, +} + +/// Adapts a byte stream to a `Stream` yielding entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +#[derive(Debug)] +pub struct FramedRead<T> { + inner: codec::FramedRead<T, Decoder>, +} + +/// An error when the number of bytes read is more than max frame length. +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FrameTooBig { + _priv: (), +} + +#[derive(Debug)] +struct Decoder { + // Configuration values + builder: Builder, + + // Read state + state: DecodeState, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +/// Adapts a byte stream to a `Sink` accepting entire frame values. +/// +/// See [module level] documentation for more detail. +/// +/// [module level]: index.html +#[deprecated(since = "0.1.8", note = "Moved to tokio-codec")] +#[doc(hidden)] +pub struct FramedWrite<T, B: IntoBuf = BytesMut> { + // I/O type + inner: T, + + // Configuration values + builder: Builder, + + // Current frame being written + frame: Option<Chain<Cursor<BytesMut>, B::Buf>>, +} + +// ===== impl Framed ===== + +impl<T: AsyncRead + AsyncWrite, B: IntoBuf> Framed<T, B> { + /// Creates a new `Framed` with default configuration values. + pub fn new(inner: T) -> Framed<T, B> { + Builder::new().new_framed(inner) + } +} + +impl<T, B: IntoBuf> Framed<T, B> { + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref().get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().get_mut() + } + + /// Consumes the `Framed`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner().into_inner() + } +} + +impl<T: AsyncRead, B: IntoBuf> Stream for Framed<T, B> { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> { + self.inner.poll() + } +} + +impl<T: AsyncWrite, B: IntoBuf> Sink for Framed<T, B> { + type SinkItem = B; + type SinkError = io::Error; + + fn start_send(&mut self, item: B) -> StartSend<B, io::Error> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), io::Error> { + self.inner.close() + } +} + +impl<T, B: IntoBuf> fmt::Debug for Framed<T, B> +where + T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Framed") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl FramedRead ===== + +impl<T: AsyncRead> FramedRead<T> { + /// Creates a new `FramedRead` with default configuration values. + pub fn new(inner: T) -> FramedRead<T> { + Builder::new().new_read(inner) + } +} + +impl<T> FramedRead<T> { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will accept from the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.inner.decoder().builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is decoded. In other + /// words, if a frame is currently in process of being decoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.inner.decoder_mut().builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + self.inner.get_ref() + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedRead`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } + + /// Consumes the `FramedRead`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner.into_inner() + } +} + +impl<T: AsyncRead> Stream for FramedRead<T> { + type Item = BytesMut; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<BytesMut>, io::Error> { + self.inner.poll() + } +} + +impl<T: Sink> Sink for FramedRead<T> { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), T::SinkError> { + self.inner.close() + } +} + +impl<T: io::Write> io::Write for FramedRead<T> { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + self.inner.get_mut().write(src) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.get_mut().flush() + } +} + +impl<T: AsyncWrite> AsyncWrite for FramedRead<T> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.inner.get_mut().shutdown() + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + self.inner.get_mut().write_buf(buf) + } +} + +// ===== impl Decoder ====== + +impl Decoder { + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { + let head_len = self.builder.num_head_bytes(); + let field_len = self.builder.length_field_len; + + if src.len() < head_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + // Skip the required bytes + src.advance(self.builder.length_field_offset); + + // match endianess + let n = if self.builder.length_field_is_big_endian { + src.get_uint_be(field_len) + } else { + src.get_uint_le(field_len) + }; + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + FrameTooBig { _priv: () }, + )); + } + + // The check above ensures there is no overflow + let n = n as usize; + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_sub(-self.builder.length_adjustment as usize) + } else { + n.checked_add(self.builder.length_adjustment as usize) + }; + + // Error handling + match n { + Some(n) => n, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + )); + } + } + }; + + let num_skip = self.builder.get_num_skip(); + + if num_skip > 0 { + let _ = src.split_to(num_skip); + } + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + return Ok(Some(n)); + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + +impl codec::Decoder for Decoder { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + let n = match self.state { + DecodeState::Head => match self.decode_head(src)? { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + }, + DecodeState::Data(n) => n, + }; + + match self.decode_data(n, src)? { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + src.reserve(self.builder.num_head_bytes()); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +// ===== impl FramedWrite ===== + +impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> { + /// Creates a new `FramedWrite` with default configuration values. + pub fn new(inner: T) -> FramedWrite<T, B> { + Builder::new().new_write(inner) + } +} + +impl<T, B: IntoBuf> FramedWrite<T, B> { + /// Returns the current max frame setting + /// + /// This is the largest size this codec will write to the wire. Larger + /// frames will be rejected. + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + + /// Updates the max frame setting. + /// + /// The change takes effect the next time a frame is encoded. In other + /// words, if a frame is currently in process of being encoded with a frame + /// size greater than `val` but less than the max frame length in effect + /// before calling this function, then the frame will be allowed. + pub fn set_max_frame_length(&mut self, val: usize) { + self.builder.max_frame_length(val); + } + + /// Returns a reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise + /// being worked with. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `FramedWrite`. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream. + /// + /// Note that care should be taken to not tamper with the underlying stream + /// of data coming in as it may corrupt the stream of frames otherwise being + /// worked with. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl<T: AsyncWrite, B: IntoBuf> FramedWrite<T, B> { + // If there is a buffered frame, try to write it to `T` + fn do_write(&mut self) -> Poll<(), io::Error> { + if self.frame.is_none() { + return Ok(Async::Ready(())); + } + + loop { + let frame = self.frame.as_mut().unwrap(); + if try_ready!(self.inner.write_buf(frame)) == 0 { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + )); + } + + if !frame.has_remaining() { + break; + } + } + + self.frame = None; + + Ok(Async::Ready(())) + } + + fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> { + let mut head = BytesMut::with_capacity(8); + let n = buf.remaining(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + FrameTooBig { _priv: () }, + )); + } + + // Adjust `n` with bounds checking + let n = if self.builder.length_adjustment < 0 { + n.checked_add(-self.builder.length_adjustment as usize) + } else { + n.checked_sub(self.builder.length_adjustment as usize) + }; + + // Error handling + let n = match n { + Some(n) => n, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "provided length would overflow after adjustment", + )); + } + }; + + if self.builder.length_field_is_big_endian { + head.put_uint_be(n as u64, self.builder.length_field_len); + } else { + head.put_uint_le(n as u64, self.builder.length_field_len); + } + + debug_assert!(self.frame.is_none()); + + self.frame = Some(head.into_buf().chain(buf)); + + Ok(()) + } +} + +impl<T: AsyncWrite, B: IntoBuf> Sink for FramedWrite<T, B> { + type SinkItem = B; + type SinkError = io::Error; + + fn start_send(&mut self, item: B) -> StartSend<B, io::Error> { + if !self.do_write()?.is_ready() { + return Ok(AsyncSink::NotReady(item)); + } + + self.set_frame(item.into_buf())?; + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), io::Error> { + // Write any buffered frame to T + try_ready!(self.do_write()); + + // Try flushing the underlying IO + try_ready!(self.inner.poll_flush()); + + return Ok(Async::Ready(())); + } + + fn close(&mut self) -> Poll<(), io::Error> { + try_ready!(self.poll_complete()); + self.inner.shutdown() + } +} + +impl<T: Stream, B: IntoBuf> Stream for FramedWrite<T, B> { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { + self.inner.poll() + } +} + +impl<T: io::Read, B: IntoBuf> io::Read for FramedWrite<T, B> { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + self.get_mut().read(dst) + } +} + +impl<T: AsyncRead, U: IntoBuf> AsyncRead for FramedWrite<T, U> { + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + self.get_mut().read_buf(buf) + } + + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.get_ref().prepare_uninitialized_buffer(buf) + } +} + +impl<T, B: IntoBuf> fmt::Debug for FramedWrite<T, B> +where + T: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FramedWrite") + .field("inner", &self.inner) + .field("builder", &self.builder) + .field("frame", &self.frame) + .finish() + } +} + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited framer builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// ``` + pub fn new() -> Builder { + Builder { + // Default max frame length of 8MB + max_frame_len: 8 * 1_024 * 1_024, + + // Default byte length of 4 + length_field_len: 4, + + // Default to the header field being at the start of the header. + length_field_offset: 0, + + length_adjustment: 0, + + // Total number of bytes to skip before reading the payload, if not set, + // `length_field_len + length_field_offset` + num_skip: None, + + // Default to reading the length field in network (big) endian. + length_field_is_big_endian: true, + } + } + + /// Read the length field as a big endian integer + /// + /// This is the default setting. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .big_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn big_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = true; + self + } + + /// Read the length field as a little endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .little_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn little_endian(&mut self) -> &mut Self { + self.length_field_is_big_endian = false; + self + } + + /// Read the length field as a native endian integer + /// + /// The default setting is big endian. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .native_endian() + /// .new_read(io); + /// # } + /// ``` + pub fn native_endian(&mut self) -> &mut Self { + if cfg!(target_endian = "big") { + self.big_endian() + } else { + self.little_endian() + } + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `FrameTooBig` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Sets the number of bytes used to represent the length field + /// + /// The default value is `4`. The max value is `8`. + /// + /// This configuration option applies to both encoding and decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_length(4) + /// .new_read(io); + /// # } + /// ``` + pub fn length_field_length(&mut self, val: usize) -> &mut Self { + assert!(val > 0 && val <= 8, "invalid length field length"); + self.length_field_len = val; + self + } + + /// Sets the number of bytes in the header before the length field + /// + /// This configuration option only applies to decoding. + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(1) + /// .new_read(io); + /// # } + /// ``` + pub fn length_field_offset(&mut self, val: usize) -> &mut Self { + self.length_field_offset = val; + self + } + + /// Delta between the payload length specified in the header and the real + /// payload length + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_adjustment(-2) + /// .new_read(io); + /// # } + /// ``` + pub fn length_adjustment(&mut self, val: isize) -> &mut Self { + self.length_adjustment = val; + self + } + + /// Sets the number of bytes to skip before reading the payload + /// + /// Default value is `length_field_len + length_field_offset` + /// + /// This configuration option only applies to decoding + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .num_skip(4) + /// .new_read(io); + /// # } + /// ``` + pub fn num_skip(&mut self, val: usize) -> &mut Self { + self.num_skip = Some(val); + self + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # use tokio_io::AsyncRead; + /// use tokio_io::codec::length_delimited::Builder; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// Builder::new() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T> + where + T: AsyncRead, + { + FramedRead { + inner: codec::FramedRead::new( + upstream, + Decoder { + builder: *self, + state: DecodeState::Head, + }, + ), + } + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_io; + /// # extern crate bytes; + /// # use tokio_io::AsyncWrite; + /// # use tokio_io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// # let _: length_delimited::FramedWrite<T, BytesMut> = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T, B>(&self, inner: T) -> FramedWrite<T, B> + where + T: AsyncWrite, + B: IntoBuf, + { + FramedWrite { + inner: inner, + builder: *self, + frame: None, + } + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # extern crate tokio_io; + /// # extern crate bytes; + /// # use tokio_io::{AsyncRead, AsyncWrite}; + /// # use tokio_io::codec::length_delimited; + /// # use bytes::BytesMut; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _: length_delimited::Framed<T, BytesMut> = + /// length_delimited::Builder::new() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T, B>(&self, inner: T) -> Framed<T, B> + where + T: AsyncRead + AsyncWrite, + B: IntoBuf, + { + let inner = self.new_read(self.new_write(inner)); + Framed { inner: inner } + } + + fn num_head_bytes(&self) -> usize { + let num = self.length_field_offset + self.length_field_len; + cmp::max(num, self.num_skip.unwrap_or(0)) + } + + fn get_num_skip(&self) -> usize { + self.num_skip + .unwrap_or(self.length_field_offset + self.length_field_len) + } +} + +// ===== impl FrameTooBig ===== + +impl fmt::Debug for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameTooBig").finish() + } +} + +impl fmt::Display for FrameTooBig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(self.description()) + } +} + +impl StdError for FrameTooBig { + fn description(&self) -> &str { + "frame size too big" + } +} diff --git a/third_party/rust/tokio-io/src/lib.rs b/third_party/rust/tokio-io/src/lib.rs new file mode 100644 index 0000000000..6d9fe22927 --- /dev/null +++ b/third_party/rust/tokio-io/src/lib.rs @@ -0,0 +1,75 @@ +#![deny(missing_docs, missing_debug_implementations)] +#![doc(html_root_url = "https://docs.rs/tokio-io/0.1.13")] + +//! Core I/O traits and combinators when working with Tokio. +//! +//! > **Note:** This crate has been **deprecated in tokio 0.2.x** and has been +//! > moved into [`tokio::io`]. +//! +//! [`tokio::io`]: https://docs.rs/tokio/latest/tokio/io/index.html +//! +//! A description of the high-level I/O combinators can be [found online] in +//! addition to a description of the [low level details]. +//! +//! [found online]: https://tokio.rs/docs/getting-started/core/ +//! [low level details]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/ + +#[macro_use] +extern crate log; + +#[macro_use] +extern crate futures; +extern crate bytes; + +use std::io as std_io; + +use futures::{Future, Stream}; + +/// A convenience typedef around a `Future` whose error component is `io::Error` +pub type IoFuture<T> = Box<dyn Future<Item = T, Error = std_io::Error> + Send>; + +/// A convenience typedef around a `Stream` whose error component is `io::Error` +pub type IoStream<T> = Box<dyn Stream<Item = T, Error = std_io::Error> + Send>; + +/// A convenience macro for working with `io::Result<T>` from the `Read` and +/// `Write` traits. +/// +/// This macro takes `io::Result<T>` as input, and returns `T` as the output. If +/// the input type is of the `Err` variant, then `Poll::NotReady` is returned if +/// it indicates `WouldBlock` or otherwise `Err` is returned. +#[macro_export] +macro_rules! try_nb { + ($e:expr) => { + match $e { + Ok(t) => t, + Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { + return Ok(::futures::Async::NotReady); + } + Err(e) => return Err(e.into()), + } + }; +} + +pub mod codec; +pub mod io; + +pub mod _tokio_codec; +mod allow_std; +mod async_read; +mod async_write; +mod framed; +mod framed_read; +mod framed_write; +mod length_delimited; +mod lines; +mod split; +mod window; + +pub use self::async_read::AsyncRead; +pub use self::async_write::AsyncWrite; + +fn _assert_objects() { + fn _assert<T>() {} + _assert::<Box<dyn AsyncRead>>(); + _assert::<Box<dyn AsyncWrite>>(); +} diff --git a/third_party/rust/tokio-io/src/lines.rs b/third_party/rust/tokio-io/src/lines.rs new file mode 100644 index 0000000000..8e59ff8fa2 --- /dev/null +++ b/third_party/rust/tokio-io/src/lines.rs @@ -0,0 +1,62 @@ +use std::io::{self, BufRead}; +use std::mem; + +use futures::{Poll, Stream}; + +use AsyncRead; + +/// Combinator created by the top-level `lines` method which is a stream over +/// the lines of text on an I/O object. +#[derive(Debug)] +pub struct Lines<A> { + io: A, + line: String, +} + +/// Creates a new stream from the I/O object given representing the lines of +/// input that are found on `A`. +/// +/// This method takes an asynchronous I/O object, `a`, and returns a `Stream` of +/// lines that the object contains. The returned stream will reach its end once +/// `a` reaches EOF. +pub fn lines<A>(a: A) -> Lines<A> +where + A: AsyncRead + BufRead, +{ + Lines { + io: a, + line: String::new(), + } +} + +impl<A> Lines<A> { + /// Returns the underlying I/O object. + /// + /// Note that this may lose data already read into internal buffers. It's + /// recommended to only call this once the stream has reached its end. + pub fn into_inner(self) -> A { + self.io + } +} + +impl<A> Stream for Lines<A> +where + A: AsyncRead + BufRead, +{ + type Item = String; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<String>, io::Error> { + let n = try_nb!(self.io.read_line(&mut self.line)); + if n == 0 && self.line.len() == 0 { + return Ok(None.into()); + } + if self.line.ends_with("\n") { + self.line.pop(); + if self.line.ends_with("\r") { + self.line.pop(); + } + } + Ok(Some(mem::replace(&mut self.line, String::new())).into()) + } +} diff --git a/third_party/rust/tokio-io/src/split.rs b/third_party/rust/tokio-io/src/split.rs new file mode 100644 index 0000000000..ef8f990c8c --- /dev/null +++ b/third_party/rust/tokio-io/src/split.rs @@ -0,0 +1,247 @@ +use std::io::{self, Read, Write}; + +use bytes::{Buf, BufMut}; +use futures::sync::BiLock; +use futures::{Async, Poll}; + +use {AsyncRead, AsyncWrite}; + +/// The readable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct ReadHalf<T> { + handle: BiLock<T>, +} + +impl<T: AsyncRead + AsyncWrite> ReadHalf<T> { + /// Reunite with a previously split `WriteHalf`. + /// + /// # Panics + /// + /// If this `ReadHalf` and the given `WriteHalf` do not originate from + /// the same `AsyncRead::split` operation this method will panic. + pub fn unsplit(self, w: WriteHalf<T>) -> T { + if let Ok(x) = self.handle.reunite(w.handle) { + x + } else { + panic!("Unrelated `WriteHalf` passed to `ReadHalf::unsplit`.") + } + } +} + +/// The writable half of an object returned from `AsyncRead::split`. +#[derive(Debug)] +pub struct WriteHalf<T> { + handle: BiLock<T>, +} + +impl<T: AsyncRead + AsyncWrite> WriteHalf<T> { + /// Reunite with a previously split `ReadHalf`. + /// + /// # panics + /// + /// If this `WriteHalf` and the given `ReadHalf` do not originate from + /// the same `AsyncRead::split` operation this method will panic. + pub fn unsplit(self, r: ReadHalf<T>) -> T { + if let Ok(x) = self.handle.reunite(r.handle) { + x + } else { + panic!("Unrelated `ReadHalf` passed to `WriteHalf::unsplit`.") + } + } +} + +pub fn split<T: AsyncRead + AsyncWrite>(t: T) -> (ReadHalf<T>, WriteHalf<T>) { + let (a, b) = BiLock::new(t); + (ReadHalf { handle: a }, WriteHalf { handle: b }) +} + +fn would_block() -> io::Error { + io::Error::new(io::ErrorKind::WouldBlock, "would block") +} + +impl<T: AsyncRead> Read for ReadHalf<T> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.read(buf), + Async::NotReady => Err(would_block()), + } + } +} + +impl<T: AsyncRead> AsyncRead for ReadHalf<T> { + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.read_buf(buf) + } +} + +impl<T: AsyncWrite> Write for WriteHalf<T> { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.write(buf), + Async::NotReady => Err(would_block()), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self.handle.poll_lock() { + Async::Ready(mut l) => l.flush(), + Async::NotReady => Err(would_block()), + } + } +} + +impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> { + fn shutdown(&mut self) -> Poll<(), io::Error> { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.shutdown() + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> + where + Self: Sized, + { + let mut l = try_ready!(wrap_as_io(self.handle.poll_lock())); + l.write_buf(buf) + } +} + +fn wrap_as_io<T>(t: Async<T>) -> Result<Async<T>, io::Error> { + Ok(t) +} + +#[cfg(test)] +mod tests { + extern crate tokio_current_thread; + + use super::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf}; + use bytes::{BytesMut, IntoBuf}; + use futures::sync::BiLock; + use futures::{future::lazy, future::ok, Async, Poll}; + + use std::io::{self, Read, Write}; + + struct RW; + + impl Read for RW { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Ok(1) + } + } + + impl AsyncRead for RW {} + + impl Write for RW { + fn write(&mut self, _: &[u8]) -> io::Result<usize> { + Ok(1) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } + + impl AsyncWrite for RW { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } + } + + #[test] + fn split_readhalf_translate_wouldblock_to_not_ready() { + tokio_current_thread::block_on_all(lazy(move || { + let rw = RW {}; + let (a, b) = BiLock::new(rw); + let mut rx = ReadHalf { handle: a }; + + let mut buf = BytesMut::with_capacity(64); + + // First read is uncontended, should go through. + assert!(rx.read_buf(&mut buf).unwrap().is_ready()); + + // Take lock from write side. + let lock = b.poll_lock(); + + // Second read should be NotReady. + assert!(!rx.read_buf(&mut buf).unwrap().is_ready()); + + drop(lock); + + // Back to uncontended. + assert!(rx.read_buf(&mut buf).unwrap().is_ready()); + + ok::<(), ()>(()) + })) + .unwrap(); + } + + #[test] + fn split_writehalf_translate_wouldblock_to_not_ready() { + tokio_current_thread::block_on_all(lazy(move || { + let rw = RW {}; + let (a, b) = BiLock::new(rw); + let mut tx = WriteHalf { handle: a }; + + let bufmut = BytesMut::with_capacity(64); + let mut buf = bufmut.into_buf(); + + // First write is uncontended, should go through. + assert!(tx.write_buf(&mut buf).unwrap().is_ready()); + + // Take lock from read side. + let lock = b.poll_lock(); + + // Second write should be NotReady. + assert!(!tx.write_buf(&mut buf).unwrap().is_ready()); + + drop(lock); + + // Back to uncontended. + assert!(tx.write_buf(&mut buf).unwrap().is_ready()); + + ok::<(), ()>(()) + })) + .unwrap(); + } + + #[test] + fn unsplit_ok() { + let (r, w) = RW.split(); + r.unsplit(w); + + let (r, w) = RW.split(); + w.unsplit(r); + } + + #[test] + #[should_panic] + fn unsplit_err1() { + let (r, _) = RW.split(); + let (_, w) = RW.split(); + r.unsplit(w); + } + + #[test] + #[should_panic] + fn unsplit_err2() { + let (_, w) = RW.split(); + let (r, _) = RW.split(); + r.unsplit(w); + } + + #[test] + #[should_panic] + fn unsplit_err3() { + let (_, w) = RW.split(); + let (r, _) = RW.split(); + w.unsplit(r); + } + + #[test] + #[should_panic] + fn unsplit_err4() { + let (r, _) = RW.split(); + let (_, w) = RW.split(); + w.unsplit(r); + } +} diff --git a/third_party/rust/tokio-io/src/window.rs b/third_party/rust/tokio-io/src/window.rs new file mode 100644 index 0000000000..4ded9ad403 --- /dev/null +++ b/third_party/rust/tokio-io/src/window.rs @@ -0,0 +1,117 @@ +use std::ops; + +/// A owned window around an underlying buffer. +/// +/// Normally slices work great for considering sub-portions of a buffer, but +/// unfortunately a slice is a *borrowed* type in Rust which has an associated +/// lifetime. When working with future and async I/O these lifetimes are not +/// always appropriate, and are sometimes difficult to store in tasks. This +/// type strives to fill this gap by providing an "owned slice" around an +/// underlying buffer of bytes. +/// +/// A `Window<T>` wraps an underlying buffer, `T`, and has configurable +/// start/end indexes to alter the behavior of the `AsRef<[u8]>` implementation +/// that this type carries. +/// +/// This type can be particularly useful when working with the `write_all` +/// combinator in this crate. Data can be sliced via `Window`, consumed by +/// `write_all`, and then earned back once the write operation finishes through +/// the `into_inner` method on this type. +#[derive(Debug)] +pub struct Window<T> { + inner: T, + range: ops::Range<usize>, +} + +impl<T: AsRef<[u8]>> Window<T> { + /// Creates a new window around the buffer `t` defaulting to the entire + /// slice. + /// + /// Further methods can be called on the returned `Window<T>` to alter the + /// window into the data provided. + pub fn new(t: T) -> Window<T> { + Window { + range: 0..t.as_ref().len(), + inner: t, + } + } + + /// Gets a shared reference to the underlying buffer inside of this + /// `Window`. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying buffer inside of this + /// `Window`. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this `Window`, returning the underlying buffer. + pub fn into_inner(self) -> T { + self.inner + } + + /// Returns the starting index of this window into the underlying buffer + /// `T`. + pub fn start(&self) -> usize { + self.range.start + } + + /// Returns the end index of this window into the underlying buffer + /// `T`. + pub fn end(&self) -> usize { + self.range.end + } + + /// Changes the starting index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `start` is out of bounds for the underlying + /// slice or if it comes after the `end` configured in this window. + pub fn set_start(&mut self, start: usize) -> &mut Window<T> { + assert!(start <= self.inner.as_ref().len()); + assert!(start <= self.range.end); + self.range.start = start; + self + } + + /// Changes the end index of this window to the index specified. + /// + /// Returns the windows back to chain multiple calls to this method. + /// + /// # Panics + /// + /// This method will panic if `end` is out of bounds for the underlying + /// slice or if it comes before the `start` configured in this window. + pub fn set_end(&mut self, end: usize) -> &mut Window<T> { + assert!(end <= self.inner.as_ref().len()); + assert!(self.range.start <= end); + self.range.end = end; + self + } + + // TODO: how about a generic set() method along the lines of: + // + // buffer.set(..3) + // .set(0..2) + // .set(4..) + // + // etc. +} + +impl<T: AsRef<[u8]>> AsRef<[u8]> for Window<T> { + fn as_ref(&self) -> &[u8] { + &self.inner.as_ref()[self.range.start..self.range.end] + } +} + +impl<T: AsMut<[u8]>> AsMut<[u8]> for Window<T> { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.inner.as_mut()[self.range.start..self.range.end] + } +} |