summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-util/src/codec
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-util/src/codec')
-rw-r--r--third_party/rust/tokio-util/src/codec/any_delimiter_codec.rs263
-rw-r--r--third_party/rust/tokio-util/src/codec/bytes_codec.rs86
-rw-r--r--third_party/rust/tokio-util/src/codec/decoder.rs184
-rw-r--r--third_party/rust/tokio-util/src/codec/encoder.rs25
-rw-r--r--third_party/rust/tokio-util/src/codec/framed.rs373
-rw-r--r--third_party/rust/tokio-util/src/codec/framed_impl.rs308
-rw-r--r--third_party/rust/tokio-util/src/codec/framed_read.rs199
-rw-r--r--third_party/rust/tokio-util/src/codec/framed_write.rs178
-rw-r--r--third_party/rust/tokio-util/src/codec/length_delimited.rs1047
-rw-r--r--third_party/rust/tokio-util/src/codec/lines_codec.rs230
-rw-r--r--third_party/rust/tokio-util/src/codec/mod.rs290
11 files changed, 3183 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/src/codec/any_delimiter_codec.rs b/third_party/rust/tokio-util/src/codec/any_delimiter_codec.rs
new file mode 100644
index 0000000000..3dbfd456b0
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/any_delimiter_codec.rs
@@ -0,0 +1,263 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use std::{cmp, fmt, io, str, usize};
+
+const DEFAULT_SEEK_DELIMITERS: &[u8] = b",;\n\r";
+const DEFAULT_SEQUENCE_WRITER: &[u8] = b",";
+/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into chunks based on any character in the given delimiter string.
+///
+/// [`Decoder`]: crate::codec::Decoder
+/// [`Encoder`]: crate::codec::Encoder
+///
+/// # Example
+/// Decode string of bytes containing various different delimiters.
+///
+/// [`BytesMut`]: bytes::BytesMut
+/// [`Error`]: std::io::Error
+///
+/// ```
+/// use tokio_util::codec::{AnyDelimiterCodec, Decoder};
+/// use bytes::{BufMut, BytesMut};
+///
+/// #
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() -> Result<(), std::io::Error> {
+/// let mut codec = AnyDelimiterCodec::new(b",;\r\n".to_vec(),b";".to_vec());
+/// let buf = &mut BytesMut::new();
+/// buf.reserve(200);
+/// buf.put_slice(b"chunk 1,chunk 2;chunk 3\n\r");
+/// assert_eq!("chunk 1", codec.decode(buf).unwrap().unwrap());
+/// assert_eq!("chunk 2", codec.decode(buf).unwrap().unwrap());
+/// assert_eq!("chunk 3", codec.decode(buf).unwrap().unwrap());
+/// assert_eq!("", codec.decode(buf).unwrap().unwrap());
+/// assert_eq!(None, codec.decode(buf).unwrap());
+/// # Ok(())
+/// # }
+/// ```
+///
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct AnyDelimiterCodec {
+ // Stored index of the next index to examine for the delimiter character.
+ // This is used to optimize searching.
+ // For example, if `decode` was called with `abc` and the delimiter is '{}', it would hold `3`,
+ // because that is the next index to examine.
+ // The next time `decode` is called with `abcde}`, the method will
+ // only look at `de}` before returning.
+ next_index: usize,
+
+ /// The maximum length for a given chunk. If `usize::MAX`, chunks will be
+ /// read until a delimiter character is reached.
+ max_length: usize,
+
+ /// Are we currently discarding the remainder of a chunk which was over
+ /// the length limit?
+ is_discarding: bool,
+
+ /// The bytes that are using for search during decode
+ seek_delimiters: Vec<u8>,
+
+ /// The bytes that are using for encoding
+ sequence_writer: Vec<u8>,
+}
+
+impl AnyDelimiterCodec {
+ /// Returns a `AnyDelimiterCodec` for splitting up data into chunks.
+ ///
+ /// # Note
+ ///
+ /// The returned `AnyDelimiterCodec` will not have an upper bound on the length
+ /// of a buffered chunk. See the documentation for [`new_with_max_length`]
+ /// for information on why this could be a potential security risk.
+ ///
+ /// [`new_with_max_length`]: crate::codec::AnyDelimiterCodec::new_with_max_length()
+ pub fn new(seek_delimiters: Vec<u8>, sequence_writer: Vec<u8>) -> AnyDelimiterCodec {
+ AnyDelimiterCodec {
+ next_index: 0,
+ max_length: usize::MAX,
+ is_discarding: false,
+ seek_delimiters,
+ sequence_writer,
+ }
+ }
+
+ /// Returns a `AnyDelimiterCodec` with a maximum chunk length limit.
+ ///
+ /// If this is set, calls to `AnyDelimiterCodec::decode` will return a
+ /// [`AnyDelimiterCodecError`] when a chunk exceeds the length limit. Subsequent calls
+ /// will discard up to `limit` bytes from that chunk until a delimiter
+ /// character is reached, returning `None` until the delimiter over the limit
+ /// has been fully discarded. After that point, calls to `decode` will
+ /// function as normal.
+ ///
+ /// # Note
+ ///
+ /// Setting a length limit is highly recommended for any `AnyDelimiterCodec` which
+ /// will be exposed to untrusted input. Otherwise, the size of the buffer
+ /// that holds the chunk currently being read is unbounded. An attacker could
+ /// exploit this unbounded buffer by sending an unbounded amount of input
+ /// without any delimiter characters, causing unbounded memory consumption.
+ ///
+ /// [`AnyDelimiterCodecError`]: crate::codec::AnyDelimiterCodecError
+ pub fn new_with_max_length(
+ seek_delimiters: Vec<u8>,
+ sequence_writer: Vec<u8>,
+ max_length: usize,
+ ) -> Self {
+ AnyDelimiterCodec {
+ max_length,
+ ..AnyDelimiterCodec::new(seek_delimiters, sequence_writer)
+ }
+ }
+
+ /// Returns the maximum chunk length when decoding.
+ ///
+ /// ```
+ /// use std::usize;
+ /// use tokio_util::codec::AnyDelimiterCodec;
+ ///
+ /// let codec = AnyDelimiterCodec::new(b",;\n".to_vec(), b";".to_vec());
+ /// assert_eq!(codec.max_length(), usize::MAX);
+ /// ```
+ /// ```
+ /// use tokio_util::codec::AnyDelimiterCodec;
+ ///
+ /// let codec = AnyDelimiterCodec::new_with_max_length(b",;\n".to_vec(), b";".to_vec(), 256);
+ /// assert_eq!(codec.max_length(), 256);
+ /// ```
+ pub fn max_length(&self) -> usize {
+ self.max_length
+ }
+}
+
+impl Decoder for AnyDelimiterCodec {
+ type Item = Bytes;
+ type Error = AnyDelimiterCodecError;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> {
+ loop {
+ // Determine how far into the buffer we'll search for a delimiter. If
+ // there's no max_length set, we'll read to the end of the buffer.
+ let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
+
+ let new_chunk_offset = buf[self.next_index..read_to].iter().position(|b| {
+ self.seek_delimiters
+ .iter()
+ .any(|delimiter| *b == *delimiter)
+ });
+
+ match (self.is_discarding, new_chunk_offset) {
+ (true, Some(offset)) => {
+ // If we found a new chunk, discard up to that offset and
+ // then stop discarding. On the next iteration, we'll try
+ // to read a chunk normally.
+ buf.advance(offset + self.next_index + 1);
+ self.is_discarding = false;
+ self.next_index = 0;
+ }
+ (true, None) => {
+ // Otherwise, we didn't find a new chunk, so we'll discard
+ // everything we read. On the next iteration, we'll continue
+ // discarding up to max_len bytes unless we find a new chunk.
+ buf.advance(read_to);
+ self.next_index = 0;
+ if buf.is_empty() {
+ return Ok(None);
+ }
+ }
+ (false, Some(offset)) => {
+ // Found a chunk!
+ let new_chunk_index = offset + self.next_index;
+ self.next_index = 0;
+ let mut chunk = buf.split_to(new_chunk_index + 1);
+ chunk.truncate(chunk.len() - 1);
+ let chunk = chunk.freeze();
+ return Ok(Some(chunk));
+ }
+ (false, None) if buf.len() > self.max_length => {
+ // Reached the maximum length without finding a
+ // new chunk, return an error and start discarding on the
+ // next call.
+ self.is_discarding = true;
+ return Err(AnyDelimiterCodecError::MaxChunkLengthExceeded);
+ }
+ (false, None) => {
+ // We didn't find a chunk or reach the length limit, so the next
+ // call will resume searching at the current offset.
+ self.next_index = read_to;
+ return Ok(None);
+ }
+ }
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Bytes>, AnyDelimiterCodecError> {
+ Ok(match self.decode(buf)? {
+ Some(frame) => Some(frame),
+ None => {
+ // return remaining data, if any
+ if buf.is_empty() {
+ None
+ } else {
+ let chunk = buf.split_to(buf.len());
+ self.next_index = 0;
+ Some(chunk.freeze())
+ }
+ }
+ })
+ }
+}
+
+impl<T> Encoder<T> for AnyDelimiterCodec
+where
+ T: AsRef<str>,
+{
+ type Error = AnyDelimiterCodecError;
+
+ fn encode(&mut self, chunk: T, buf: &mut BytesMut) -> Result<(), AnyDelimiterCodecError> {
+ let chunk = chunk.as_ref();
+ buf.reserve(chunk.len() + 1);
+ buf.put(chunk.as_bytes());
+ buf.put(self.sequence_writer.as_ref());
+
+ Ok(())
+ }
+}
+
+impl Default for AnyDelimiterCodec {
+ fn default() -> Self {
+ Self::new(
+ DEFAULT_SEEK_DELIMITERS.to_vec(),
+ DEFAULT_SEQUENCE_WRITER.to_vec(),
+ )
+ }
+}
+
+/// An error occurred while encoding or decoding a chunk.
+#[derive(Debug)]
+pub enum AnyDelimiterCodecError {
+ /// The maximum chunk length was exceeded.
+ MaxChunkLengthExceeded,
+ /// An IO error occurred.
+ Io(io::Error),
+}
+
+impl fmt::Display for AnyDelimiterCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ AnyDelimiterCodecError::MaxChunkLengthExceeded => {
+ write!(f, "max chunk length exceeded")
+ }
+ AnyDelimiterCodecError::Io(e) => write!(f, "{}", e),
+ }
+ }
+}
+
+impl From<io::Error> for AnyDelimiterCodecError {
+ fn from(e: io::Error) -> AnyDelimiterCodecError {
+ AnyDelimiterCodecError::Io(e)
+ }
+}
+
+impl std::error::Error for AnyDelimiterCodecError {}
diff --git a/third_party/rust/tokio-util/src/codec/bytes_codec.rs b/third_party/rust/tokio-util/src/codec/bytes_codec.rs
new file mode 100644
index 0000000000..ceab228b94
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/bytes_codec.rs
@@ -0,0 +1,86 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use std::io;
+
+/// A simple [`Decoder`] and [`Encoder`] implementation that just ships bytes around.
+///
+/// [`Decoder`]: crate::codec::Decoder
+/// [`Encoder`]: crate::codec::Encoder
+///
+/// # Example
+///
+/// Turn an [`AsyncRead`] into a stream of `Result<`[`BytesMut`]`, `[`Error`]`>`.
+///
+/// [`AsyncRead`]: tokio::io::AsyncRead
+/// [`BytesMut`]: bytes::BytesMut
+/// [`Error`]: std::io::Error
+///
+/// ```
+/// # mod hidden {
+/// # #[allow(unused_imports)]
+/// use tokio::fs::File;
+/// # }
+/// use tokio::io::AsyncRead;
+/// use tokio_util::codec::{FramedRead, BytesCodec};
+///
+/// # enum File {}
+/// # impl File {
+/// # async fn open(_name: &str) -> Result<impl AsyncRead, std::io::Error> {
+/// # use std::io::Cursor;
+/// # Ok(Cursor::new(vec![0, 1, 2, 3, 4, 5]))
+/// # }
+/// # }
+/// #
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() -> Result<(), std::io::Error> {
+/// let my_async_read = File::open("filename.txt").await?;
+/// let my_stream_of_bytes = FramedRead::new(my_async_read, BytesCodec::new());
+/// # Ok(())
+/// # }
+/// ```
+///
+#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
+pub struct BytesCodec(());
+
+impl BytesCodec {
+ /// Creates a new `BytesCodec` for shipping around raw bytes.
+ pub fn new() -> BytesCodec {
+ BytesCodec(())
+ }
+}
+
+impl Decoder for BytesCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ if !buf.is_empty() {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len)))
+ } else {
+ Ok(None)
+ }
+ }
+}
+
+impl Encoder<Bytes> for BytesCodec {
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
+
+impl Encoder<BytesMut> for BytesCodec {
+ type Error = io::Error;
+
+ fn encode(&mut self, data: BytesMut, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/decoder.rs b/third_party/rust/tokio-util/src/codec/decoder.rs
new file mode 100644
index 0000000000..c5927783d1
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/decoder.rs
@@ -0,0 +1,184 @@
+use crate::codec::Framed;
+
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use std::io;
+
+/// Decoding of frames via buffers.
+///
+/// This trait is used when constructing an instance of [`Framed`] or
+/// [`FramedRead`]. An implementation of `Decoder` takes a byte stream that has
+/// already been buffered in `src` and decodes the data into a stream of
+/// `Self::Item` frames.
+///
+/// Implementations are able to track state on `self`, which enables
+/// implementing stateful streaming parsers. In many cases, though, this type
+/// will simply be a unit struct (e.g. `struct HttpDecoder`).
+///
+/// For some underlying data-sources, namely files and FIFOs,
+/// it's possible to temporarily read 0 bytes by reaching EOF.
+///
+/// In these cases `decode_eof` will be called until it signals
+/// fullfillment of all closing frames by returning `Ok(None)`.
+/// After that, repeated attempts to read from the [`Framed`] or [`FramedRead`]
+/// will not invoke `decode` or `decode_eof` again, until data can be read
+/// during a retry.
+///
+/// It is up to the Decoder to keep track of a restart after an EOF,
+/// and to decide how to handle such an event by, for example,
+/// allowing frames to cross EOF boundaries, re-emitting opening frames, or
+/// resetting the entire internal state.
+///
+/// [`Framed`]: crate::codec::Framed
+/// [`FramedRead`]: crate::codec::FramedRead
+pub trait Decoder {
+ /// The type of decoded frames.
+ type Item;
+
+ /// The type of unrecoverable frame decoding errors.
+ ///
+ /// If an individual message is ill-formed but can be ignored without
+ /// interfering with the processing of future messages, it may be more
+ /// useful to report the failure as an `Item`.
+ ///
+ /// `From<io::Error>` is required in the interest of making `Error` suitable
+ /// for returning directly from a [`FramedRead`], and to enable the default
+ /// implementation of `decode_eof` to yield an `io::Error` when the decoder
+ /// fails to consume all available data.
+ ///
+ /// Note that implementors of this trait can simply indicate `type Error =
+ /// io::Error` to use I/O errors as this type.
+ ///
+ /// [`FramedRead`]: crate::codec::FramedRead
+ type Error: From<io::Error>;
+
+ /// Attempts to decode a frame from the provided buffer of bytes.
+ ///
+ /// This method is called by [`FramedRead`] whenever bytes are ready to be
+ /// parsed. The provided buffer of bytes is what's been read so far, and
+ /// this instance of `Decode` can determine whether an entire frame is in
+ /// the buffer and is ready to be returned.
+ ///
+ /// If an entire frame is available, then this instance will remove those
+ /// bytes from the buffer provided and return them as a decoded
+ /// frame. Note that removing bytes from the provided buffer doesn't always
+ /// necessarily copy the bytes, so this should be an efficient operation in
+ /// most circumstances.
+ ///
+ /// If the bytes look valid, but a frame isn't fully available yet, then
+ /// `Ok(None)` is returned. This indicates to the [`Framed`] instance that
+ /// it needs to read some more bytes before calling this method again.
+ ///
+ /// Note that the bytes provided may be empty. If a previous call to
+ /// `decode` consumed all the bytes in the buffer then `decode` will be
+ /// called again until it returns `Ok(None)`, indicating that more bytes need to
+ /// be read.
+ ///
+ /// Finally, if the bytes in the buffer are malformed then an error is
+ /// returned indicating why. This informs [`Framed`] that the stream is now
+ /// corrupt and should be terminated.
+ ///
+ /// [`Framed`]: crate::codec::Framed
+ /// [`FramedRead`]: crate::codec::FramedRead
+ ///
+ /// # Buffer management
+ ///
+ /// Before returning from the function, implementations should ensure that
+ /// the buffer has appropriate capacity in anticipation of future calls to
+ /// `decode`. Failing to do so leads to inefficiency.
+ ///
+ /// For example, if frames have a fixed length, or if the length of the
+ /// current frame is known from a header, a possible buffer management
+ /// strategy is:
+ ///
+ /// ```no_run
+ /// # use std::io;
+ /// #
+ /// # use bytes::BytesMut;
+ /// # use tokio_util::codec::Decoder;
+ /// #
+ /// # struct MyCodec;
+ /// #
+ /// impl Decoder for MyCodec {
+ /// // ...
+ /// # type Item = BytesMut;
+ /// # type Error = io::Error;
+ ///
+ /// fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ /// // ...
+ ///
+ /// // Reserve enough to complete decoding of the current frame.
+ /// let current_frame_len: usize = 1000; // Example.
+ /// // And to start decoding the next frame.
+ /// let next_frame_header_len: usize = 10; // Example.
+ /// src.reserve(current_frame_len + next_frame_header_len);
+ ///
+ /// return Ok(None);
+ /// }
+ /// }
+ /// ```
+ ///
+ /// An optimal buffer management strategy minimizes reallocations and
+ /// over-allocations.
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error>;
+
+ /// A default method available to be called when there are no more bytes
+ /// available to be read from the underlying I/O.
+ ///
+ /// This method defaults to calling `decode` and returns an error if
+ /// `Ok(None)` is returned while there is unconsumed data in `buf`.
+ /// Typically this doesn't need to be implemented unless the framing
+ /// protocol differs near the end of the stream, or if you need to construct
+ /// frames _across_ eof boundaries on sources that can be resumed.
+ ///
+ /// Note that the `buf` argument may be empty. If a previous call to
+ /// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
+ /// called again until it returns `None`, indicating that there are no more
+ /// frames to yield. This behavior enables returning finalization frames
+ /// that may not be based on inbound data.
+ ///
+ /// Once `None` has been returned, `decode_eof` won't be called again until
+ /// an attempt to resume the stream has been made, where the underlying stream
+ /// actually returned more data.
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ match self.decode(buf)? {
+ Some(frame) => Ok(Some(frame)),
+ None => {
+ if buf.is_empty() {
+ Ok(None)
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "bytes remaining on stream").into())
+ }
+ }
+ }
+ }
+
+ /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
+ /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both `Stream` and
+ /// `Sink`; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling `split` on the [`Framed`] returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`Sink`]: futures_sink::Sink
+ /// [`Framed`]: crate::codec::Framed
+ fn framed<T: AsyncRead + AsyncWrite + Sized>(self, io: T) -> Framed<T, Self>
+ where
+ Self: Sized,
+ {
+ Framed::new(io, self)
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/encoder.rs b/third_party/rust/tokio-util/src/codec/encoder.rs
new file mode 100644
index 0000000000..770a10fa9b
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/encoder.rs
@@ -0,0 +1,25 @@
+use bytes::BytesMut;
+use std::io;
+
+/// Trait of helper objects to write out messages as bytes, for use with
+/// [`FramedWrite`].
+///
+/// [`FramedWrite`]: crate::codec::FramedWrite
+pub trait Encoder<Item> {
+ /// The type of encoding errors.
+ ///
+ /// [`FramedWrite`] requires `Encoder`s errors to implement `From<io::Error>`
+ /// in the interest letting it return `Error`s directly.
+ ///
+ /// [`FramedWrite`]: crate::codec::FramedWrite
+ type Error: From<io::Error>;
+
+ /// Encodes a frame into the buffer provided.
+ ///
+ /// This method will encode `item` into the byte buffer provided by `dst`.
+ /// The `dst` provided is an internal buffer of the [`FramedWrite`] instance and
+ /// will be written out when possible.
+ ///
+ /// [`FramedWrite`]: crate::codec::FramedWrite
+ fn encode(&mut self, item: Item, dst: &mut BytesMut) -> Result<(), Self::Error>;
+}
diff --git a/third_party/rust/tokio-util/src/codec/framed.rs b/third_party/rust/tokio-util/src/codec/framed.rs
new file mode 100644
index 0000000000..d89b8b6dc3
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/framed.rs
@@ -0,0 +1,373 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
+
+use futures_core::Stream;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
+ /// the `Encoder` and `Decoder` traits to encode and decode frames.
+ ///
+ /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
+ /// by using the `new` function seen below.
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`Sink`]: futures_sink::Sink
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ /// [`Decoder::framed`]: crate::codec::Decoder::framed()
+ pub struct Framed<T, U> {
+ #[pin]
+ inner: FramedImpl<T, U, RWFrames>
+ }
+}
+
+impl<T, U> Framed<T, U>
+where
+ T: AsyncRead + AsyncWrite,
+{
+ /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
+ /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the codec
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both [`Stream`] and
+ /// [`Sink`]; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling [`split`] on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ ///
+ /// Note that, for some byte sources, the stream can be resumed after an EOF
+ /// by reading from it, even after it has returned `None`. Repeated attempts
+ /// to do so, without new data available, continue to return `None` without
+ /// creating more (closing) frames.
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`Sink`]: futures_sink::Sink
+ /// [`Decode`]: crate::codec::Decoder
+ /// [`Encoder`]: crate::codec::Encoder
+ /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
+ pub fn new(inner: T, codec: U) -> Framed<T, U> {
+ Framed {
+ inner: FramedImpl {
+ inner,
+ codec,
+ state: Default::default(),
+ },
+ }
+ }
+
+ /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
+ /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
+ /// with a specific read buffer initial capacity.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the codec
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both [`Stream`] and
+ /// [`Sink`]; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling [`split`] on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`Sink`]: futures_sink::Sink
+ /// [`Decode`]: crate::codec::Decoder
+ /// [`Encoder`]: crate::codec::Encoder
+ /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
+ pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
+ Framed {
+ inner: FramedImpl {
+ inner,
+ codec,
+ state: RWFrames {
+ read: ReadFrame {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(capacity),
+ has_errored: false,
+ },
+ write: WriteFrame::default(),
+ },
+ },
+ }
+ }
+}
+
+impl<T, U> Framed<T, U> {
+ /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
+ /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
+ ///
+ /// Raw I/O objects work with byte sequences, but higher-level code usually
+ /// wants to batch these into meaningful chunks, called "frames". This
+ /// method layers framing on top of an I/O object, by using the `Codec`
+ /// traits to handle encoding and decoding of messages frames. Note that
+ /// the incoming and outgoing frame types may be distinct.
+ ///
+ /// This function returns a *single* object that is both [`Stream`] and
+ /// [`Sink`]; grouping this into a single object is often useful for layering
+ /// things like gzip or TLS, which require both read and write access to the
+ /// underlying object.
+ ///
+ /// This objects takes a stream and a readbuffer and a writebuffer. These field
+ /// can be obtained from an existing `Framed` with the [`into_parts`] method.
+ ///
+ /// If you want to work more directly with the streams and sink, consider
+ /// calling [`split`] on the `Framed` returned by this method, which will
+ /// break them into separate objects, allowing them to interact more easily.
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`Sink`]: futures_sink::Sink
+ /// [`Decoder`]: crate::codec::Decoder
+ /// [`Encoder`]: crate::codec::Encoder
+ /// [`into_parts`]: crate::codec::Framed::into_parts()
+ /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
+ pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
+ Framed {
+ inner: FramedImpl {
+ inner: parts.io,
+ codec: parts.codec,
+ state: RWFrames {
+ read: parts.read_buf.into(),
+ write: parts.write_buf.into(),
+ },
+ },
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner
+ }
+
+ /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
+ self.project().inner.project().inner
+ }
+
+ /// Returns a reference to the underlying codec wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec(&self) -> &U {
+ &self.inner.codec
+ }
+
+ /// Returns a mutable reference to the underlying codec wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec_mut(&mut self) -> &mut U {
+ &mut self.inner.codec
+ }
+
+ /// Maps the codec `U` to `C`, preserving the read and write buffers
+ /// wrapped by `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn map_codec<C, F>(self, map: F) -> Framed<T, C>
+ where
+ F: FnOnce(U) -> C,
+ {
+ // This could be potentially simplified once rust-lang/rust#86555 hits stable
+ let parts = self.into_parts();
+ Framed::from_parts(FramedParts {
+ io: parts.io,
+ codec: map(parts.codec),
+ read_buf: parts.read_buf,
+ write_buf: parts.write_buf,
+ _priv: (),
+ })
+ }
+
+ /// Returns a mutable reference to the underlying codec wrapped by
+ /// `Framed`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying codec
+ /// as it may corrupt the stream of frames otherwise being worked with.
+ pub fn codec_pin_mut(self: Pin<&mut Self>) -> &mut U {
+ self.project().inner.project().codec
+ }
+
+ /// Returns a reference to the read buffer.
+ pub fn read_buffer(&self) -> &BytesMut {
+ &self.inner.state.read.buffer
+ }
+
+ /// Returns a mutable reference to the read buffer.
+ pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
+ &mut self.inner.state.read.buffer
+ }
+
+ /// Returns a reference to the write buffer.
+ pub fn write_buffer(&self) -> &BytesMut {
+ &self.inner.state.write.buffer
+ }
+
+ /// Returns a mutable reference to the write buffer.
+ pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
+ &mut self.inner.state.write.buffer
+ }
+
+ /// Consumes the `Framed`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner
+ }
+
+ /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
+ /// with unprocessed data, and the codec.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_parts(self) -> FramedParts<T, U> {
+ FramedParts {
+ io: self.inner.inner,
+ codec: self.inner.codec,
+ read_buf: self.inner.state.read.buffer,
+ write_buf: self.inner.state.write.buffer,
+ _priv: (),
+ }
+ }
+}
+
+// This impl just defers to the underlying FramedImpl
+impl<T, U> Stream for Framed<T, U>
+where
+ T: AsyncRead,
+ U: Decoder,
+{
+ type Item = Result<U::Item, U::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().inner.poll_next(cx)
+ }
+}
+
+// This impl just defers to the underlying FramedImpl
+impl<T, I, U> Sink<I> for Framed<T, U>
+where
+ T: AsyncWrite,
+ U: Encoder<I>,
+ U::Error: From<io::Error>,
+{
+ type Error = U::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ self.project().inner.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_close(cx)
+ }
+}
+
+impl<T, U> fmt::Debug for Framed<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Framed")
+ .field("io", self.get_ref())
+ .field("codec", self.codec())
+ .finish()
+ }
+}
+
+/// `FramedParts` contains an export of the data of a Framed transport.
+/// It can be used to construct a new [`Framed`] with a different codec.
+/// It contains all current buffers and the inner transport.
+///
+/// [`Framed`]: crate::codec::Framed
+#[derive(Debug)]
+#[allow(clippy::manual_non_exhaustive)]
+pub struct FramedParts<T, U> {
+ /// The inner transport used to read bytes to and write bytes to
+ pub io: T,
+
+ /// The codec
+ pub codec: U,
+
+ /// The buffer with read but unprocessed data.
+ pub read_buf: BytesMut,
+
+ /// A buffer with unprocessed data which are not written yet.
+ pub write_buf: BytesMut,
+
+ /// This private field allows us to add additional fields in the future in a
+ /// backwards compatible way.
+ _priv: (),
+}
+
+impl<T, U> FramedParts<T, U> {
+ /// Create a new, default, `FramedParts`
+ pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
+ where
+ U: Encoder<I>,
+ {
+ FramedParts {
+ io,
+ codec,
+ read_buf: BytesMut::new(),
+ write_buf: BytesMut::new(),
+ _priv: (),
+ }
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/framed_impl.rs b/third_party/rust/tokio-util/src/codec/framed_impl.rs
new file mode 100644
index 0000000000..ce1a6db873
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/framed_impl.rs
@@ -0,0 +1,308 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+
+use futures_core::Stream;
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use bytes::BytesMut;
+use futures_core::ready;
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use std::borrow::{Borrow, BorrowMut};
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use tracing::trace;
+
+pin_project! {
+ #[derive(Debug)]
+ pub(crate) struct FramedImpl<T, U, State> {
+ #[pin]
+ pub(crate) inner: T,
+ pub(crate) state: State,
+ pub(crate) codec: U,
+ }
+}
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
+
+#[derive(Debug)]
+pub(crate) struct ReadFrame {
+ pub(crate) eof: bool,
+ pub(crate) is_readable: bool,
+ pub(crate) buffer: BytesMut,
+ pub(crate) has_errored: bool,
+}
+
+pub(crate) struct WriteFrame {
+ pub(crate) buffer: BytesMut,
+}
+
+#[derive(Default)]
+pub(crate) struct RWFrames {
+ pub(crate) read: ReadFrame,
+ pub(crate) write: WriteFrame,
+}
+
+impl Default for ReadFrame {
+ fn default() -> Self {
+ Self {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ has_errored: false,
+ }
+ }
+}
+
+impl Default for WriteFrame {
+ fn default() -> Self {
+ Self {
+ buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+}
+
+impl From<BytesMut> for ReadFrame {
+ fn from(mut buffer: BytesMut) -> Self {
+ let size = buffer.capacity();
+ if size < INITIAL_CAPACITY {
+ buffer.reserve(INITIAL_CAPACITY - size);
+ }
+
+ Self {
+ buffer,
+ is_readable: size > 0,
+ eof: false,
+ has_errored: false,
+ }
+ }
+}
+
+impl From<BytesMut> for WriteFrame {
+ fn from(mut buffer: BytesMut) -> Self {
+ let size = buffer.capacity();
+ if size < INITIAL_CAPACITY {
+ buffer.reserve(INITIAL_CAPACITY - size);
+ }
+
+ Self { buffer }
+ }
+}
+
+impl Borrow<ReadFrame> for RWFrames {
+ fn borrow(&self) -> &ReadFrame {
+ &self.read
+ }
+}
+impl BorrowMut<ReadFrame> for RWFrames {
+ fn borrow_mut(&mut self) -> &mut ReadFrame {
+ &mut self.read
+ }
+}
+impl Borrow<WriteFrame> for RWFrames {
+ fn borrow(&self) -> &WriteFrame {
+ &self.write
+ }
+}
+impl BorrowMut<WriteFrame> for RWFrames {
+ fn borrow_mut(&mut self) -> &mut WriteFrame {
+ &mut self.write
+ }
+}
+impl<T, U, R> Stream for FramedImpl<T, U, R>
+where
+ T: AsyncRead,
+ U: Decoder,
+ R: BorrowMut<ReadFrame>,
+{
+ type Item = Result<U::Item, U::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ use crate::util::poll_read_buf;
+
+ let mut pinned = self.project();
+ let state: &mut ReadFrame = pinned.state.borrow_mut();
+ // The following loops implements a state machine with each state corresponding
+ // to a combination of the `is_readable` and `eof` flags. States persist across
+ // loop entries and most state transitions occur with a return.
+ //
+ // The initial state is `reading`.
+ //
+ // | state | eof | is_readable | has_errored |
+ // |---------|-------|-------------|-------------|
+ // | reading | false | false | false |
+ // | framing | false | true | false |
+ // | pausing | true | true | false |
+ // | paused | true | false | false |
+ // | errored | <any> | <any> | true |
+ // `decode_eof` returns Err
+ // ┌────────────────────────────────────────────────────────┐
+ // `decode_eof` returns │ │
+ // `Ok(Some)` │ │
+ // ┌─────┐ │ `decode_eof` returns After returning │
+ // Read 0 bytes ├─────▼──┴┐ `Ok(None)` ┌────────┐ ◄───┐ `None` ┌───▼─────┐
+ // ┌────────────────►│ Pausing ├───────────────────────►│ Paused ├─┐ └───────────┤ Errored │
+ // │ └─────────┘ └─┬──▲───┘ │ └───▲───▲─┘
+ // Pending read │ │ │ │ │ │
+ // ┌──────┐ │ `decode` returns `Some` │ └─────┘ │ │
+ // │ │ │ ┌──────┐ │ Pending │ │
+ // │ ┌────▼──┴─┐ Read n>0 bytes ┌┴──────▼─┐ read n>0 bytes │ read │ │
+ // └─┤ Reading ├───────────────►│ Framing │◄────────────────────────┘ │ │
+ // └──┬─▲────┘ └─────┬──┬┘ │ │
+ // │ │ │ │ `decode` returns Err │ │
+ // │ └───decode` returns `None`──┘ └───────────────────────────────────────────────────────┘ │
+ // │ read returns Err │
+ // └────────────────────────────────────────────────────────────────────────────────────────────┘
+ loop {
+ // Return `None` if we have encountered an error from the underlying decoder
+ // See: https://github.com/tokio-rs/tokio/issues/3976
+ if state.has_errored {
+ // preparing has_errored -> paused
+ trace!("Returning None and setting paused");
+ state.is_readable = false;
+ state.has_errored = false;
+ return Poll::Ready(None);
+ }
+
+ // Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
+ // i.e. it _might_ contain data consumable as a frame or closing frame.
+ // Both signal that there is no such data by returning `None`.
+ //
+ // If `decode` couldn't read a frame and the upstream source has returned eof,
+ // `decode_eof` will attempt to decode the remaining bytes as closing frames.
+ //
+ // If the underlying AsyncRead is resumable, we may continue after an EOF,
+ // but must finish emitting all of it's associated `decode_eof` frames.
+ // Furthermore, we don't want to emit any `decode_eof` frames on retried
+ // reads after an EOF unless we've actually read more data.
+ if state.is_readable {
+ // pausing or framing
+ if state.eof {
+ // pausing
+ let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| {
+ trace!("Got an error, going to errored state");
+ state.has_errored = true;
+ err
+ })?;
+ if frame.is_none() {
+ state.is_readable = false; // prepare pausing -> paused
+ }
+ // implicit pausing -> pausing or pausing -> paused
+ return Poll::Ready(frame.map(Ok));
+ }
+
+ // framing
+ trace!("attempting to decode a frame");
+
+ if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| {
+ trace!("Got an error, going to errored state");
+ state.has_errored = true;
+ op
+ })? {
+ trace!("frame decoded from buffer");
+ // implicit framing -> framing
+ return Poll::Ready(Some(Ok(frame)));
+ }
+
+ // framing -> reading
+ state.is_readable = false;
+ }
+ // reading or paused
+ // If we can't build a frame yet, try to read more data and try again.
+ // Make sure we've got room for at least one byte to read to ensure
+ // that we don't get a spurious 0 that looks like EOF.
+ state.buffer.reserve(1);
+ let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer).map_err(
+ |err| {
+ trace!("Got an error, going to errored state");
+ state.has_errored = true;
+ err
+ },
+ )? {
+ Poll::Ready(ct) => ct,
+ // implicit reading -> reading or implicit paused -> paused
+ Poll::Pending => return Poll::Pending,
+ };
+ if bytect == 0 {
+ if state.eof {
+ // We're already at an EOF, and since we've reached this path
+ // we're also not readable. This implies that we've already finished
+ // our `decode_eof` handling, so we can simply return `None`.
+ // implicit paused -> paused
+ return Poll::Ready(None);
+ }
+ // prepare reading -> paused
+ state.eof = true;
+ } else {
+ // prepare paused -> framing or noop reading -> framing
+ state.eof = false;
+ }
+
+ // paused -> framing or reading -> framing or reading -> pausing
+ state.is_readable = true;
+ }
+ }
+}
+
+impl<T, I, U, W> Sink<I> for FramedImpl<T, U, W>
+where
+ T: AsyncWrite,
+ U: Encoder<I>,
+ U::Error: From<io::Error>,
+ W: BorrowMut<WriteFrame>,
+{
+ type Error = U::Error;
+
+ fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY {
+ self.as_mut().poll_flush(cx)
+ } else {
+ Poll::Ready(Ok(()))
+ }
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ let pinned = self.project();
+ pinned
+ .codec
+ .encode(item, &mut pinned.state.borrow_mut().buffer)?;
+ Ok(())
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ use crate::util::poll_write_buf;
+ trace!("flushing framed transport");
+ let mut pinned = self.project();
+
+ while !pinned.state.borrow_mut().buffer.is_empty() {
+ let WriteFrame { buffer } = pinned.state.borrow_mut();
+ trace!(remaining = buffer.len(), "writing;");
+
+ let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?;
+
+ if n == 0 {
+ return Poll::Ready(Err(io::Error::new(
+ io::ErrorKind::WriteZero,
+ "failed to \
+ write frame to transport",
+ )
+ .into()));
+ }
+ }
+
+ // Try flushing the underlying IO
+ ready!(pinned.inner.poll_flush(cx))?;
+
+ trace!("framed transport flushed");
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ ready!(self.as_mut().poll_flush(cx))?;
+ ready!(self.project().inner.poll_shutdown(cx))?;
+
+ Poll::Ready(Ok(()))
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/framed_read.rs b/third_party/rust/tokio-util/src/codec/framed_read.rs
new file mode 100644
index 0000000000..184c567b49
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/framed_read.rs
@@ -0,0 +1,199 @@
+use crate::codec::framed_impl::{FramedImpl, ReadFrame};
+use crate::codec::Decoder;
+
+use futures_core::Stream;
+use tokio::io::AsyncRead;
+
+use bytes::BytesMut;
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A [`Stream`] of messages decoded from an [`AsyncRead`].
+ ///
+ /// [`Stream`]: futures_core::Stream
+ /// [`AsyncRead`]: tokio::io::AsyncRead
+ pub struct FramedRead<T, D> {
+ #[pin]
+ inner: FramedImpl<T, D, ReadFrame>,
+ }
+}
+
+// ===== impl FramedRead =====
+
+impl<T, D> FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ /// Creates a new `FramedRead` with the given `decoder`.
+ pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
+ FramedRead {
+ inner: FramedImpl {
+ inner,
+ codec: decoder,
+ state: Default::default(),
+ },
+ }
+ }
+
+ /// Creates a new `FramedRead` with the given `decoder` and a buffer of `capacity`
+ /// initial size.
+ pub fn with_capacity(inner: T, decoder: D, capacity: usize) -> FramedRead<T, D> {
+ FramedRead {
+ inner: FramedImpl {
+ inner,
+ codec: decoder,
+ state: ReadFrame {
+ eof: false,
+ is_readable: false,
+ buffer: BytesMut::with_capacity(capacity),
+ has_errored: false,
+ },
+ },
+ }
+ }
+}
+
+impl<T, D> FramedRead<T, D> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner
+ }
+
+ /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
+ /// `FramedRead`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
+ self.project().inner.project().inner
+ }
+
+ /// Consumes the `FramedRead`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner
+ }
+
+ /// Returns a reference to the underlying decoder.
+ pub fn decoder(&self) -> &D {
+ &self.inner.codec
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn decoder_mut(&mut self) -> &mut D {
+ &mut self.inner.codec
+ }
+
+ /// Maps the decoder `D` to `C`, preserving the read buffer
+ /// wrapped by `Framed`.
+ pub fn map_decoder<C, F>(self, map: F) -> FramedRead<T, C>
+ where
+ F: FnOnce(D) -> C,
+ {
+ // This could be potentially simplified once rust-lang/rust#86555 hits stable
+ let FramedImpl {
+ inner,
+ state,
+ codec,
+ } = self.inner;
+ FramedRead {
+ inner: FramedImpl {
+ inner,
+ state,
+ codec: map(codec),
+ },
+ }
+ }
+
+ /// Returns a mutable reference to the underlying decoder.
+ pub fn decoder_pin_mut(self: Pin<&mut Self>) -> &mut D {
+ self.project().inner.project().codec
+ }
+
+ /// Returns a reference to the read buffer.
+ pub fn read_buffer(&self) -> &BytesMut {
+ &self.inner.state.buffer
+ }
+
+ /// Returns a mutable reference to the read buffer.
+ pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
+ &mut self.inner.state.buffer
+ }
+}
+
+// This impl just defers to the underlying FramedImpl
+impl<T, D> Stream for FramedRead<T, D>
+where
+ T: AsyncRead,
+ D: Decoder,
+{
+ type Item = Result<D::Item, D::Error>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().inner.poll_next(cx)
+ }
+}
+
+// This impl just defers to the underlying T: Sink
+impl<T, I, D> Sink<I> for FramedRead<T, D>
+where
+ T: Sink<I>,
+{
+ type Error = T::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.project().inner.poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ self.project().inner.project().inner.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.project().inner.poll_close(cx)
+ }
+}
+
+impl<T, D> fmt::Debug for FramedRead<T, D>
+where
+ T: fmt::Debug,
+ D: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FramedRead")
+ .field("inner", &self.get_ref())
+ .field("decoder", &self.decoder())
+ .field("eof", &self.inner.state.eof)
+ .field("is_readable", &self.inner.state.is_readable)
+ .field("buffer", &self.read_buffer())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/framed_write.rs b/third_party/rust/tokio-util/src/codec/framed_write.rs
new file mode 100644
index 0000000000..aa4cec9820
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/framed_write.rs
@@ -0,0 +1,178 @@
+use crate::codec::encoder::Encoder;
+use crate::codec::framed_impl::{FramedImpl, WriteFrame};
+
+use futures_core::Stream;
+use tokio::io::AsyncWrite;
+
+use bytes::BytesMut;
+use futures_sink::Sink;
+use pin_project_lite::pin_project;
+use std::fmt;
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+pin_project! {
+ /// A [`Sink`] of frames encoded to an `AsyncWrite`.
+ ///
+ /// [`Sink`]: futures_sink::Sink
+ pub struct FramedWrite<T, E> {
+ #[pin]
+ inner: FramedImpl<T, E, WriteFrame>,
+ }
+}
+
+impl<T, E> FramedWrite<T, E>
+where
+ T: AsyncWrite,
+{
+ /// Creates a new `FramedWrite` with the given `encoder`.
+ pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
+ FramedWrite {
+ inner: FramedImpl {
+ inner,
+ codec: encoder,
+ state: WriteFrame::default(),
+ },
+ }
+ }
+}
+
+impl<T, E> FramedWrite<T, E> {
+ /// Returns a reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_ref(&self) -> &T {
+ &self.inner.inner
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.inner.inner
+ }
+
+ /// Returns a pinned mutable reference to the underlying I/O stream wrapped by
+ /// `FramedWrite`.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
+ self.project().inner.project().inner
+ }
+
+ /// Consumes the `FramedWrite`, returning its underlying I/O stream.
+ ///
+ /// Note that care should be taken to not tamper with the underlying stream
+ /// of data coming in as it may corrupt the stream of frames otherwise
+ /// being worked with.
+ pub fn into_inner(self) -> T {
+ self.inner.inner
+ }
+
+ /// Returns a reference to the underlying encoder.
+ pub fn encoder(&self) -> &E {
+ &self.inner.codec
+ }
+
+ /// Returns a mutable reference to the underlying encoder.
+ pub fn encoder_mut(&mut self) -> &mut E {
+ &mut self.inner.codec
+ }
+
+ /// Maps the encoder `E` to `C`, preserving the write buffer
+ /// wrapped by `Framed`.
+ pub fn map_encoder<C, F>(self, map: F) -> FramedWrite<T, C>
+ where
+ F: FnOnce(E) -> C,
+ {
+ // This could be potentially simplified once rust-lang/rust#86555 hits stable
+ let FramedImpl {
+ inner,
+ state,
+ codec,
+ } = self.inner;
+ FramedWrite {
+ inner: FramedImpl {
+ inner,
+ state,
+ codec: map(codec),
+ },
+ }
+ }
+
+ /// Returns a mutable reference to the underlying encoder.
+ pub fn encoder_pin_mut(self: Pin<&mut Self>) -> &mut E {
+ self.project().inner.project().codec
+ }
+
+ /// Returns a reference to the write buffer.
+ pub fn write_buffer(&self) -> &BytesMut {
+ &self.inner.state.buffer
+ }
+
+ /// Returns a mutable reference to the write buffer.
+ pub fn write_buffer_mut(&mut self) -> &mut BytesMut {
+ &mut self.inner.state.buffer
+ }
+}
+
+// This impl just defers to the underlying FramedImpl
+impl<T, I, E> Sink<I> for FramedWrite<T, E>
+where
+ T: AsyncWrite,
+ E: Encoder<I>,
+ E::Error: From<io::Error>,
+{
+ type Error = E::Error;
+
+ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_ready(cx)
+ }
+
+ fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
+ self.project().inner.start_send(item)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_flush(cx)
+ }
+
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
+ self.project().inner.poll_close(cx)
+ }
+}
+
+// This impl just defers to the underlying T: Stream
+impl<T, D> Stream for FramedWrite<T, D>
+where
+ T: Stream,
+{
+ type Item = T::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.project().inner.project().inner.poll_next(cx)
+ }
+}
+
+impl<T, U> fmt::Debug for FramedWrite<T, U>
+where
+ T: fmt::Debug,
+ U: fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FramedWrite")
+ .field("inner", &self.get_ref())
+ .field("encoder", &self.encoder())
+ .field("buffer", &self.inner.state.buffer)
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-util/src/codec/length_delimited.rs b/third_party/rust/tokio-util/src/codec/length_delimited.rs
new file mode 100644
index 0000000000..93d2f180d0
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/length_delimited.rs
@@ -0,0 +1,1047 @@
+//! Frame a stream of bytes based on a length prefix
+//!
+//! Many protocols delimit their frames by prefacing frame data with a
+//! frame head that specifies the length of the frame. The
+//! `length_delimited` module provides utilities for handling the length
+//! based framing. This allows the consumer to work with entire frames
+//! without having to worry about buffering or other framing logic.
+//!
+//! # Getting started
+//!
+//! If implementing a protocol from scratch, using length delimited framing
+//! is an easy way to get started. [`LengthDelimitedCodec::new()`] will
+//! return a length delimited codec using default configuration values.
+//! This can then be used to construct a framer to adapt a full-duplex
+//! byte stream into a stream of frames.
+//!
+//! ```
+//! use tokio::io::{AsyncRead, AsyncWrite};
+//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
+//!
+//! fn bind_transport<T: AsyncRead + AsyncWrite>(io: T)
+//! -> Framed<T, LengthDelimitedCodec>
+//! {
+//! Framed::new(io, LengthDelimitedCodec::new())
+//! }
+//! # pub fn main() {}
+//! ```
+//!
+//! The returned transport implements `Sink + Stream` for `BytesMut`. It
+//! encodes the frame with a big-endian `u32` header denoting the frame
+//! payload length:
+//!
+//! ```text
+//! +----------+--------------------------------+
+//! | len: u32 | frame payload |
+//! +----------+--------------------------------+
+//! ```
+//!
+//! Specifically, given the following:
+//!
+//! ```
+//! use tokio::io::{AsyncRead, AsyncWrite};
+//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
+//!
+//! use futures::SinkExt;
+//! use bytes::Bytes;
+//!
+//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
+//! where
+//! T: AsyncRead + AsyncWrite + Unpin,
+//! {
+//! let mut transport = Framed::new(io, LengthDelimitedCodec::new());
+//! let frame = Bytes::from("hello world");
+//!
+//! transport.send(frame).await?;
+//! Ok(())
+//! }
+//! ```
+//!
+//! The encoded frame will look like this:
+//!
+//! ```text
+//! +---- len: u32 ----+---- data ----+
+//! | \x00\x00\x00\x0b | hello world |
+//! +------------------+--------------+
+//! ```
+//!
+//! # Decoding
+//!
+//! [`FramedRead`] adapts an [`AsyncRead`] into a `Stream` of [`BytesMut`],
+//! such that each yielded [`BytesMut`] value contains the contents of an
+//! entire frame. There are many configuration parameters enabling
+//! [`FramedRead`] to handle a wide range of protocols. Here are some
+//! examples that will cover the various options at a high level.
+//!
+//! ## Example 1
+//!
+//! The following will parse a `u16` length field at offset 0, including the
+//! frame head in the yielded `BytesMut`.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(0) // default value
+//! .length_field_type::<u16>()
+//! .length_adjustment(0) // default value
+//! .num_skip(0) // Do not strip frame header
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT DECODED
+//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
+//! | \x00\x0B | Hello world | --> | \x00\x0B | Hello world |
+//! +----------+---------------+ +----------+---------------+
+//! ```
+//!
+//! The value of the length field is 11 (`\x0B`) which represents the length
+//! of the payload, `hello world`. By default, [`FramedRead`] assumes that
+//! the length field represents the number of bytes that **follows** the
+//! length field. Thus, the entire frame has a length of 13: 2 bytes for the
+//! frame head + 11 bytes for the payload.
+//!
+//! ## Example 2
+//!
+//! The following will parse a `u16` length field at offset 0, omitting the
+//! frame head in the yielded `BytesMut`.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(0) // default value
+//! .length_field_type::<u16>()
+//! .length_adjustment(0) // default value
+//! // `num_skip` is not needed, the default is to skip
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT DECODED
+//! +-- len ---+--- Payload ---+ +--- Payload ---+
+//! | \x00\x0B | Hello world | --> | Hello world |
+//! +----------+---------------+ +---------------+
+//! ```
+//!
+//! This is similar to the first example, the only difference is that the
+//! frame head is **not** included in the yielded `BytesMut` value.
+//!
+//! ## Example 3
+//!
+//! The following will parse a `u16` length field at offset 0, including the
+//! frame head in the yielded `BytesMut`. In this case, the length field
+//! **includes** the frame head length.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(0) // default value
+//! .length_field_type::<u16>()
+//! .length_adjustment(-2) // size of head
+//! .num_skip(0)
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT DECODED
+//! +-- len ---+--- Payload ---+ +-- len ---+--- Payload ---+
+//! | \x00\x0D | Hello world | --> | \x00\x0D | Hello world |
+//! +----------+---------------+ +----------+---------------+
+//! ```
+//!
+//! In most cases, the length field represents the length of the payload
+//! only, as shown in the previous examples. However, in some protocols the
+//! length field represents the length of the whole frame, including the
+//! head. In such cases, we specify a negative `length_adjustment` to adjust
+//! the value provided in the frame head to represent the payload length.
+//!
+//! ## Example 4
+//!
+//! The following will parse a 3 byte length field at offset 0 in a 5 byte
+//! frame head, including the frame head in the yielded `BytesMut`.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(0) // default value
+//! .length_field_length(3)
+//! .length_adjustment(2) // remaining head
+//! .num_skip(0)
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT
+//! +---- len -----+- head -+--- Payload ---+
+//! | \x00\x00\x0B | \xCAFE | Hello world |
+//! +--------------+--------+---------------+
+//!
+//! DECODED
+//! +---- len -----+- head -+--- Payload ---+
+//! | \x00\x00\x0B | \xCAFE | Hello world |
+//! +--------------+--------+---------------+
+//! ```
+//!
+//! A more advanced example that shows a case where there is extra frame
+//! head data between the length field and the payload. In such cases, it is
+//! usually desirable to include the frame head as part of the yielded
+//! `BytesMut`. This lets consumers of the length delimited framer to
+//! process the frame head as needed.
+//!
+//! The positive `length_adjustment` value lets `FramedRead` factor in the
+//! additional head into the frame length calculation.
+//!
+//! ## Example 5
+//!
+//! The following will parse a `u16` length field at offset 1 of a 4 byte
+//! frame head. The first byte and the length field will be omitted from the
+//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+//! included.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(1) // length of hdr1
+//! .length_field_type::<u16>()
+//! .length_adjustment(1) // length of hdr2
+//! .num_skip(3) // length of hdr1 + LEN
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT
+//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+//! | \xCA | \x00\x0B | \xFE | Hello world |
+//! +--------+----------+--------+---------------+
+//!
+//! DECODED
+//! +- hdr2 -+--- Payload ---+
+//! | \xFE | Hello world |
+//! +--------+---------------+
+//! ```
+//!
+//! The length field is situated in the middle of the frame head. In this
+//! case, the first byte in the frame head could be a version or some other
+//! identifier that is not needed for processing. On the other hand, the
+//! second half of the head is needed.
+//!
+//! `length_field_offset` indicates how many bytes to skip before starting
+//! to read the length field. `length_adjustment` is the number of bytes to
+//! skip starting at the end of the length field. In this case, it is the
+//! second half of the head.
+//!
+//! ## Example 6
+//!
+//! The following will parse a `u16` length field at offset 1 of a 4 byte
+//! frame head. The first byte and the length field will be omitted from the
+//! yielded `BytesMut`, but the trailing 2 bytes of the frame head will be
+//! included. In this case, the length field **includes** the frame head
+//! length.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(1) // length of hdr1
+//! .length_field_type::<u16>()
+//! .length_adjustment(-3) // length of hdr1 + LEN, negative
+//! .num_skip(3)
+//! .new_read(io);
+//! # }
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT
+//! +- hdr1 -+-- len ---+- hdr2 -+--- Payload ---+
+//! | \xCA | \x00\x0F | \xFE | Hello world |
+//! +--------+----------+--------+---------------+
+//!
+//! DECODED
+//! +- hdr2 -+--- Payload ---+
+//! | \xFE | Hello world |
+//! +--------+---------------+
+//! ```
+//!
+//! Similar to the example above, the difference is that the length field
+//! represents the length of the entire frame instead of just the payload.
+//! The length of `hdr1` and `len` must be counted in `length_adjustment`.
+//! Note that the length of `hdr2` does **not** need to be explicitly set
+//! anywhere because it already is factored into the total frame length that
+//! is read from the byte stream.
+//!
+//! ## Example 7
+//!
+//! The following will parse a 3 byte length field at offset 0 in a 4 byte
+//! frame head, excluding the 4th byte from the yielded `BytesMut`.
+//!
+//! ```
+//! # use tokio::io::AsyncRead;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn bind_read<T: AsyncRead>(io: T) {
+//! LengthDelimitedCodec::builder()
+//! .length_field_offset(0) // default value
+//! .length_field_length(3)
+//! .length_adjustment(0) // default value
+//! .num_skip(4) // skip the first 4 bytes
+//! .new_read(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! The following frame will be decoded as such:
+//!
+//! ```text
+//! INPUT DECODED
+//! +------- len ------+--- Payload ---+ +--- Payload ---+
+//! | \x00\x00\x0B\xFF | Hello world | => | Hello world |
+//! +------------------+---------------+ +---------------+
+//! ```
+//!
+//! A simple example where there are unused bytes between the length field
+//! and the payload.
+//!
+//! # Encoding
+//!
+//! [`FramedWrite`] adapts an [`AsyncWrite`] into a `Sink` of [`BytesMut`],
+//! such that each submitted [`BytesMut`] is prefaced by a length field.
+//! There are fewer configuration options than [`FramedRead`]. Given
+//! protocols that have more complex frame heads, an encoder should probably
+//! be written by hand using [`Encoder`].
+//!
+//! Here is a simple example, given a `FramedWrite` with the following
+//! configuration:
+//!
+//! ```
+//! # use tokio::io::AsyncWrite;
+//! # use tokio_util::codec::LengthDelimitedCodec;
+//! # fn write_frame<T: AsyncWrite>(io: T) {
+//! # let _ =
+//! LengthDelimitedCodec::builder()
+//! .length_field_type::<u16>()
+//! .new_write(io);
+//! # }
+//! # pub fn main() {}
+//! ```
+//!
+//! A payload of `hello world` will be encoded as:
+//!
+//! ```text
+//! +- len: u16 -+---- data ----+
+//! | \x00\x0b | hello world |
+//! +------------+--------------+
+//! ```
+//!
+//! [`LengthDelimitedCodec::new()`]: method@LengthDelimitedCodec::new
+//! [`FramedRead`]: struct@FramedRead
+//! [`FramedWrite`]: struct@FramedWrite
+//! [`AsyncRead`]: trait@tokio::io::AsyncRead
+//! [`AsyncWrite`]: trait@tokio::io::AsyncWrite
+//! [`Encoder`]: trait@Encoder
+//! [`BytesMut`]: bytes::BytesMut
+
+use crate::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
+
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use bytes::{Buf, BufMut, Bytes, BytesMut};
+use std::error::Error as StdError;
+use std::io::{self, Cursor};
+use std::{cmp, fmt, mem};
+
+/// Configure length delimited `LengthDelimitedCodec`s.
+///
+/// `Builder` enables constructing configured length delimited codecs. Note
+/// that not all configuration settings apply to both encoding and decoding. See
+/// the documentation for specific methods for more detail.
+#[derive(Debug, Clone, Copy)]
+pub struct Builder {
+ // Maximum frame length
+ max_frame_len: usize,
+
+ // Number of bytes representing the field length
+ length_field_len: usize,
+
+ // Number of bytes in the header before the length field
+ length_field_offset: usize,
+
+ // Adjust the length specified in the header field by this amount
+ length_adjustment: isize,
+
+ // Total number of bytes to skip before reading the payload, if not set,
+ // `length_field_len + length_field_offset`
+ num_skip: Option<usize>,
+
+ // Length field byte order (little or big endian)
+ length_field_is_big_endian: bool,
+}
+
+/// An error when the number of bytes read is more than max frame length.
+pub struct LengthDelimitedCodecError {
+ _priv: (),
+}
+
+/// A codec for frames delimited by a frame head specifying their lengths.
+///
+/// This allows the consumer to work with entire frames without having to worry
+/// about buffering or other framing logic.
+///
+/// See [module level] documentation for more detail.
+///
+/// [module level]: index.html
+#[derive(Debug, Clone)]
+pub struct LengthDelimitedCodec {
+ // Configuration values
+ builder: Builder,
+
+ // Read state
+ state: DecodeState,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum DecodeState {
+ Head,
+ Data(usize),
+}
+
+// ===== impl LengthDelimitedCodec ======
+
+impl LengthDelimitedCodec {
+ /// Creates a new `LengthDelimitedCodec` with the default configuration values.
+ pub fn new() -> Self {
+ Self {
+ builder: Builder::new(),
+ state: DecodeState::Head,
+ }
+ }
+
+ /// Creates a new length delimited codec builder with default configuration
+ /// values.
+ pub fn builder() -> Builder {
+ Builder::new()
+ }
+
+ /// Returns the current max frame setting
+ ///
+ /// This is the largest size this codec will accept from the wire. Larger
+ /// frames will be rejected.
+ pub fn max_frame_length(&self) -> usize {
+ self.builder.max_frame_len
+ }
+
+ /// Updates the max frame setting.
+ ///
+ /// The change takes effect the next time a frame is decoded. In other
+ /// words, if a frame is currently in process of being decoded with a frame
+ /// size greater than `val` but less than the max frame length in effect
+ /// before calling this function, then the frame will be allowed.
+ pub fn set_max_frame_length(&mut self, val: usize) {
+ self.builder.max_frame_length(val);
+ }
+
+ fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
+ let head_len = self.builder.num_head_bytes();
+ let field_len = self.builder.length_field_len;
+
+ if src.len() < head_len {
+ // Not enough data
+ return Ok(None);
+ }
+
+ let n = {
+ let mut src = Cursor::new(&mut *src);
+
+ // Skip the required bytes
+ src.advance(self.builder.length_field_offset);
+
+ // match endianness
+ let n = if self.builder.length_field_is_big_endian {
+ src.get_uint(field_len)
+ } else {
+ src.get_uint_le(field_len)
+ };
+
+ if n > self.builder.max_frame_len as u64 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ LengthDelimitedCodecError { _priv: () },
+ ));
+ }
+
+ // The check above ensures there is no overflow
+ let n = n as usize;
+
+ // Adjust `n` with bounds checking
+ let n = if self.builder.length_adjustment < 0 {
+ n.checked_sub(-self.builder.length_adjustment as usize)
+ } else {
+ n.checked_add(self.builder.length_adjustment as usize)
+ };
+
+ // Error handling
+ match n {
+ Some(n) => n,
+ None => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "provided length would overflow after adjustment",
+ ));
+ }
+ }
+ };
+
+ let num_skip = self.builder.get_num_skip();
+
+ if num_skip > 0 {
+ src.advance(num_skip);
+ }
+
+ // Ensure that the buffer has enough space to read the incoming
+ // payload
+ src.reserve(n);
+
+ Ok(Some(n))
+ }
+
+ fn decode_data(&self, n: usize, src: &mut BytesMut) -> Option<BytesMut> {
+ // At this point, the buffer has already had the required capacity
+ // reserved. All there is to do is read.
+ if src.len() < n {
+ return None;
+ }
+
+ Some(src.split_to(n))
+ }
+}
+
+impl Decoder for LengthDelimitedCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ let n = match self.state {
+ DecodeState::Head => match self.decode_head(src)? {
+ Some(n) => {
+ self.state = DecodeState::Data(n);
+ n
+ }
+ None => return Ok(None),
+ },
+ DecodeState::Data(n) => n,
+ };
+
+ match self.decode_data(n, src) {
+ Some(data) => {
+ // Update the decode state
+ self.state = DecodeState::Head;
+
+ // Make sure the buffer has enough space to read the next head
+ src.reserve(self.builder.num_head_bytes());
+
+ Ok(Some(data))
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+impl Encoder<Bytes> for LengthDelimitedCodec {
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Bytes, dst: &mut BytesMut) -> Result<(), io::Error> {
+ let n = data.len();
+
+ if n > self.builder.max_frame_len {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ LengthDelimitedCodecError { _priv: () },
+ ));
+ }
+
+ // Adjust `n` with bounds checking
+ let n = if self.builder.length_adjustment < 0 {
+ n.checked_add(-self.builder.length_adjustment as usize)
+ } else {
+ n.checked_sub(self.builder.length_adjustment as usize)
+ };
+
+ let n = n.ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "provided length would overflow after adjustment",
+ )
+ })?;
+
+ // Reserve capacity in the destination buffer to fit the frame and
+ // length field (plus adjustment).
+ dst.reserve(self.builder.length_field_len + n);
+
+ if self.builder.length_field_is_big_endian {
+ dst.put_uint(n as u64, self.builder.length_field_len);
+ } else {
+ dst.put_uint_le(n as u64, self.builder.length_field_len);
+ }
+
+ // Write the frame to the buffer
+ dst.extend_from_slice(&data[..]);
+
+ Ok(())
+ }
+}
+
+impl Default for LengthDelimitedCodec {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ===== impl Builder =====
+
+mod builder {
+ /// Types that can be used with `Builder::length_field_type`.
+ pub trait LengthFieldType {}
+
+ impl LengthFieldType for u8 {}
+ impl LengthFieldType for u16 {}
+ impl LengthFieldType for u32 {}
+ impl LengthFieldType for u64 {}
+
+ #[cfg(any(
+ target_pointer_width = "8",
+ target_pointer_width = "16",
+ target_pointer_width = "32",
+ target_pointer_width = "64",
+ ))]
+ impl LengthFieldType for usize {}
+}
+
+impl Builder {
+ /// Creates a new length delimited codec builder with default configuration
+ /// values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_type::<u16>()
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new() -> Builder {
+ Builder {
+ // Default max frame length of 8MB
+ max_frame_len: 8 * 1_024 * 1_024,
+
+ // Default byte length of 4
+ length_field_len: 4,
+
+ // Default to the header field being at the start of the header.
+ length_field_offset: 0,
+
+ length_adjustment: 0,
+
+ // Total number of bytes to skip before reading the payload, if not set,
+ // `length_field_len + length_field_offset`
+ num_skip: None,
+
+ // Default to reading the length field in network (big) endian.
+ length_field_is_big_endian: true,
+ }
+ }
+
+ /// Read the length field as a big endian integer
+ ///
+ /// This is the default setting.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .big_endian()
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn big_endian(&mut self) -> &mut Self {
+ self.length_field_is_big_endian = true;
+ self
+ }
+
+ /// Read the length field as a little endian integer
+ ///
+ /// The default setting is big endian.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .little_endian()
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn little_endian(&mut self) -> &mut Self {
+ self.length_field_is_big_endian = false;
+ self
+ }
+
+ /// Read the length field as a native endian integer
+ ///
+ /// The default setting is big endian.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .native_endian()
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn native_endian(&mut self) -> &mut Self {
+ if cfg!(target_endian = "big") {
+ self.big_endian()
+ } else {
+ self.little_endian()
+ }
+ }
+
+ /// Sets the max frame length in bytes
+ ///
+ /// This configuration option applies to both encoding and decoding. The
+ /// default value is 8MB.
+ ///
+ /// When decoding, the length field read from the byte stream is checked
+ /// against this setting **before** any adjustments are applied. When
+ /// encoding, the length of the submitted payload is checked against this
+ /// setting.
+ ///
+ /// When frames exceed the max length, an `io::Error` with the custom value
+ /// of the `LengthDelimitedCodecError` type will be returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .max_frame_length(8 * 1024 * 1024)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
+ self.max_frame_len = val;
+ self
+ }
+
+ /// Sets the unsigned integer type used to represent the length field.
+ ///
+ /// The default type is [`u32`]. The max type is [`u64`] (or [`usize`] on
+ /// 64-bit targets).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_type::<u32>()
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ ///
+ /// Unlike [`Builder::length_field_length`], this does not fail at runtime
+ /// and instead produces a compile error:
+ ///
+ /// ```compile_fail
+ /// # use tokio::io::AsyncRead;
+ /// # use tokio_util::codec::LengthDelimitedCodec;
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_type::<u128>()
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn length_field_type<T: builder::LengthFieldType>(&mut self) -> &mut Self {
+ self.length_field_length(mem::size_of::<T>())
+ }
+
+ /// Sets the number of bytes used to represent the length field
+ ///
+ /// The default value is `4`. The max value is `8`.
+ ///
+ /// This configuration option applies to both encoding and decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_length(4)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn length_field_length(&mut self, val: usize) -> &mut Self {
+ assert!(val > 0 && val <= 8, "invalid length field length");
+ self.length_field_len = val;
+ self
+ }
+
+ /// Sets the number of bytes in the header before the length field
+ ///
+ /// This configuration option only applies to decoding.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_offset(1)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn length_field_offset(&mut self, val: usize) -> &mut Self {
+ self.length_field_offset = val;
+ self
+ }
+
+ /// Delta between the payload length specified in the header and the real
+ /// payload length
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_adjustment(-2)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn length_adjustment(&mut self, val: isize) -> &mut Self {
+ self.length_adjustment = val;
+ self
+ }
+
+ /// Sets the number of bytes to skip before reading the payload
+ ///
+ /// Default value is `length_field_len + length_field_offset`
+ ///
+ /// This configuration option only applies to decoding
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .num_skip(4)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn num_skip(&mut self, val: usize) -> &mut Self {
+ self.num_skip = Some(val);
+ self
+ }
+
+ /// Create a configured length delimited `LengthDelimitedCodec`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ /// # pub fn main() {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_type::<u16>()
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_codec();
+ /// # }
+ /// ```
+ pub fn new_codec(&self) -> LengthDelimitedCodec {
+ LengthDelimitedCodec {
+ builder: *self,
+ state: DecodeState::Head,
+ }
+ }
+
+ /// Create a configured length delimited `FramedRead`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::LengthDelimitedCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_type::<u16>()
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, LengthDelimitedCodec>
+ where
+ T: AsyncRead,
+ {
+ FramedRead::new(upstream, self.new_codec())
+ }
+
+ /// Create a configured length delimited `FramedWrite`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncWrite;
+ /// # use tokio_util::codec::LengthDelimitedCodec;
+ /// # fn write_frame<T: AsyncWrite>(io: T) {
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_type::<u16>()
+ /// .new_write(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, LengthDelimitedCodec>
+ where
+ T: AsyncWrite,
+ {
+ FramedWrite::new(inner, self.new_codec())
+ }
+
+ /// Create a configured length delimited `Framed`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::{AsyncRead, AsyncWrite};
+ /// # use tokio_util::codec::LengthDelimitedCodec;
+ /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+ /// # let _ =
+ /// LengthDelimitedCodec::builder()
+ /// .length_field_type::<u16>()
+ /// .new_framed(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_framed<T>(&self, inner: T) -> Framed<T, LengthDelimitedCodec>
+ where
+ T: AsyncRead + AsyncWrite,
+ {
+ Framed::new(inner, self.new_codec())
+ }
+
+ fn num_head_bytes(&self) -> usize {
+ let num = self.length_field_offset + self.length_field_len;
+ cmp::max(num, self.num_skip.unwrap_or(0))
+ }
+
+ fn get_num_skip(&self) -> usize {
+ self.num_skip
+ .unwrap_or(self.length_field_offset + self.length_field_len)
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+// ===== impl LengthDelimitedCodecError =====
+
+impl fmt::Debug for LengthDelimitedCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("LengthDelimitedCodecError").finish()
+ }
+}
+
+impl fmt::Display for LengthDelimitedCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("frame size too big")
+ }
+}
+
+impl StdError for LengthDelimitedCodecError {}
diff --git a/third_party/rust/tokio-util/src/codec/lines_codec.rs b/third_party/rust/tokio-util/src/codec/lines_codec.rs
new file mode 100644
index 0000000000..7a0a8f0454
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/lines_codec.rs
@@ -0,0 +1,230 @@
+use crate::codec::decoder::Decoder;
+use crate::codec::encoder::Encoder;
+
+use bytes::{Buf, BufMut, BytesMut};
+use std::{cmp, fmt, io, str, usize};
+
+/// A simple [`Decoder`] and [`Encoder`] implementation that splits up data into lines.
+///
+/// [`Decoder`]: crate::codec::Decoder
+/// [`Encoder`]: crate::codec::Encoder
+#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
+pub struct LinesCodec {
+ // Stored index of the next index to examine for a `\n` character.
+ // This is used to optimize searching.
+ // For example, if `decode` was called with `abc`, it would hold `3`,
+ // because that is the next index to examine.
+ // The next time `decode` is called with `abcde\n`, the method will
+ // only look at `de\n` before returning.
+ next_index: usize,
+
+ /// The maximum length for a given line. If `usize::MAX`, lines will be
+ /// read until a `\n` character is reached.
+ max_length: usize,
+
+ /// Are we currently discarding the remainder of a line which was over
+ /// the length limit?
+ is_discarding: bool,
+}
+
+impl LinesCodec {
+ /// Returns a `LinesCodec` for splitting up data into lines.
+ ///
+ /// # Note
+ ///
+ /// The returned `LinesCodec` will not have an upper bound on the length
+ /// of a buffered line. See the documentation for [`new_with_max_length`]
+ /// for information on why this could be a potential security risk.
+ ///
+ /// [`new_with_max_length`]: crate::codec::LinesCodec::new_with_max_length()
+ pub fn new() -> LinesCodec {
+ LinesCodec {
+ next_index: 0,
+ max_length: usize::MAX,
+ is_discarding: false,
+ }
+ }
+
+ /// Returns a `LinesCodec` with a maximum line length limit.
+ ///
+ /// If this is set, calls to `LinesCodec::decode` will return a
+ /// [`LinesCodecError`] when a line exceeds the length limit. Subsequent calls
+ /// will discard up to `limit` bytes from that line until a newline
+ /// character is reached, returning `None` until the line over the limit
+ /// has been fully discarded. After that point, calls to `decode` will
+ /// function as normal.
+ ///
+ /// # Note
+ ///
+ /// Setting a length limit is highly recommended for any `LinesCodec` which
+ /// will be exposed to untrusted input. Otherwise, the size of the buffer
+ /// that holds the line currently being read is unbounded. An attacker could
+ /// exploit this unbounded buffer by sending an unbounded amount of input
+ /// without any `\n` characters, causing unbounded memory consumption.
+ ///
+ /// [`LinesCodecError`]: crate::codec::LinesCodecError
+ pub fn new_with_max_length(max_length: usize) -> Self {
+ LinesCodec {
+ max_length,
+ ..LinesCodec::new()
+ }
+ }
+
+ /// Returns the maximum line length when decoding.
+ ///
+ /// ```
+ /// use std::usize;
+ /// use tokio_util::codec::LinesCodec;
+ ///
+ /// let codec = LinesCodec::new();
+ /// assert_eq!(codec.max_length(), usize::MAX);
+ /// ```
+ /// ```
+ /// use tokio_util::codec::LinesCodec;
+ ///
+ /// let codec = LinesCodec::new_with_max_length(256);
+ /// assert_eq!(codec.max_length(), 256);
+ /// ```
+ pub fn max_length(&self) -> usize {
+ self.max_length
+ }
+}
+
+fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
+ str::from_utf8(buf)
+ .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
+}
+
+fn without_carriage_return(s: &[u8]) -> &[u8] {
+ if let Some(&b'\r') = s.last() {
+ &s[..s.len() - 1]
+ } else {
+ s
+ }
+}
+
+impl Decoder for LinesCodec {
+ type Item = String;
+ type Error = LinesCodecError;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
+ loop {
+ // Determine how far into the buffer we'll search for a newline. If
+ // there's no max_length set, we'll read to the end of the buffer.
+ let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
+
+ let newline_offset = buf[self.next_index..read_to]
+ .iter()
+ .position(|b| *b == b'\n');
+
+ match (self.is_discarding, newline_offset) {
+ (true, Some(offset)) => {
+ // If we found a newline, discard up to that offset and
+ // then stop discarding. On the next iteration, we'll try
+ // to read a line normally.
+ buf.advance(offset + self.next_index + 1);
+ self.is_discarding = false;
+ self.next_index = 0;
+ }
+ (true, None) => {
+ // Otherwise, we didn't find a newline, so we'll discard
+ // everything we read. On the next iteration, we'll continue
+ // discarding up to max_len bytes unless we find a newline.
+ buf.advance(read_to);
+ self.next_index = 0;
+ if buf.is_empty() {
+ return Ok(None);
+ }
+ }
+ (false, Some(offset)) => {
+ // Found a line!
+ let newline_index = offset + self.next_index;
+ self.next_index = 0;
+ let line = buf.split_to(newline_index + 1);
+ let line = &line[..line.len() - 1];
+ let line = without_carriage_return(line);
+ let line = utf8(line)?;
+ return Ok(Some(line.to_string()));
+ }
+ (false, None) if buf.len() > self.max_length => {
+ // Reached the maximum length without finding a
+ // newline, return an error and start discarding on the
+ // next call.
+ self.is_discarding = true;
+ return Err(LinesCodecError::MaxLineLengthExceeded);
+ }
+ (false, None) => {
+ // We didn't find a line or reach the length limit, so the next
+ // call will resume searching at the current offset.
+ self.next_index = read_to;
+ return Ok(None);
+ }
+ }
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, LinesCodecError> {
+ Ok(match self.decode(buf)? {
+ Some(frame) => Some(frame),
+ None => {
+ // No terminating newline - return remaining data, if any
+ if buf.is_empty() || buf == &b"\r"[..] {
+ None
+ } else {
+ let line = buf.split_to(buf.len());
+ let line = without_carriage_return(&line);
+ let line = utf8(line)?;
+ self.next_index = 0;
+ Some(line.to_string())
+ }
+ }
+ })
+ }
+}
+
+impl<T> Encoder<T> for LinesCodec
+where
+ T: AsRef<str>,
+{
+ type Error = LinesCodecError;
+
+ fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), LinesCodecError> {
+ let line = line.as_ref();
+ buf.reserve(line.len() + 1);
+ buf.put(line.as_bytes());
+ buf.put_u8(b'\n');
+ Ok(())
+ }
+}
+
+impl Default for LinesCodec {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// An error occurred while encoding or decoding a line.
+#[derive(Debug)]
+pub enum LinesCodecError {
+ /// The maximum line length was exceeded.
+ MaxLineLengthExceeded,
+ /// An IO error occurred.
+ Io(io::Error),
+}
+
+impl fmt::Display for LinesCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ LinesCodecError::MaxLineLengthExceeded => write!(f, "max line length exceeded"),
+ LinesCodecError::Io(e) => write!(f, "{}", e),
+ }
+ }
+}
+
+impl From<io::Error> for LinesCodecError {
+ fn from(e: io::Error) -> LinesCodecError {
+ LinesCodecError::Io(e)
+ }
+}
+
+impl std::error::Error for LinesCodecError {}
diff --git a/third_party/rust/tokio-util/src/codec/mod.rs b/third_party/rust/tokio-util/src/codec/mod.rs
new file mode 100644
index 0000000000..2295176bdc
--- /dev/null
+++ b/third_party/rust/tokio-util/src/codec/mod.rs
@@ -0,0 +1,290 @@
+//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink
+//!
+//! Raw I/O objects work with byte sequences, but higher-level code usually
+//! wants to batch these into meaningful chunks, called "frames".
+//!
+//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and
+//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`].
+//! Framed streams are also known as transports.
+//!
+//! # The Decoder trait
+//!
+//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an
+//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify
+//! how sequences of bytes are turned into a sequence of frames, and to
+//! determine where the boundaries between frames are. The job of the
+//! `FramedRead` is to repeatedly switch between reading more data from the IO
+//! resource, and asking the decoder whether we have received enough data to
+//! decode another frame of data.
+//!
+//! The main method on the `Decoder` trait is the [`decode`] method. This method
+//! takes as argument the data that has been read so far, and when it is called,
+//! it will be in one of the following situations:
+//!
+//! 1. The buffer contains less than a full frame.
+//! 2. The buffer contains exactly a full frame.
+//! 3. The buffer contains more than a full frame.
+//!
+//! In the first situation, the decoder should return `Ok(None)`.
+//!
+//! In the second situation, the decoder should clear the provided buffer and
+//! return `Ok(Some(the_decoded_frame))`.
+//!
+//! In the third situation, the decoder should use a method such as [`split_to`]
+//! or [`advance`] to modify the buffer such that the frame is removed from the
+//! buffer, but any data in the buffer after that frame should still remain in
+//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in
+//! this case.
+//!
+//! Finally the decoder may return an error if the data is invalid in some way.
+//! The decoder should _not_ return an error just because it has yet to receive
+//! a full frame.
+//!
+//! It is guaranteed that, from one call to `decode` to another, the provided
+//! buffer will contain the exact same data as before, except that if more data
+//! has arrived through the IO resource, that data will have been appended to
+//! the buffer. This means that reading frames from a `FramedRead` is
+//! essentially equivalent to the following loop:
+//!
+//! ```no_run
+//! use tokio::io::AsyncReadExt;
+//! # // This uses async_stream to create an example that compiles.
+//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
+//! # use tokio_util::codec::Decoder;
+//! # let mut decoder = tokio_util::codec::BytesCodec::new();
+//! # let io_resource = &mut &[0u8, 1, 2, 3][..];
+//!
+//! let mut buf = bytes::BytesMut::new();
+//! loop {
+//! // The read_buf call will append to buf rather than overwrite existing data.
+//! let len = io_resource.read_buf(&mut buf).await?;
+//!
+//! if len == 0 {
+//! while let Some(frame) = decoder.decode_eof(&mut buf)? {
+//! yield frame;
+//! }
+//! break;
+//! }
+//!
+//! while let Some(frame) = decoder.decode(&mut buf)? {
+//! yield frame;
+//! }
+//! }
+//! # }}
+//! ```
+//! The example above uses `yield` whenever the `Stream` produces an item.
+//!
+//! ## Example decoder
+//!
+//! As an example, consider a protocol that can be used to send strings where
+//! each frame is a four byte integer that contains the length of the frame,
+//! followed by that many bytes of string data. The decoder fails with an error
+//! if the string data is not valid utf-8 or too long.
+//!
+//! Such a decoder can be written like this:
+//! ```
+//! use tokio_util::codec::Decoder;
+//! use bytes::{BytesMut, Buf};
+//!
+//! struct MyStringDecoder {}
+//!
+//! const MAX: usize = 8 * 1024 * 1024;
+//!
+//! impl Decoder for MyStringDecoder {
+//! type Item = String;
+//! type Error = std::io::Error;
+//!
+//! fn decode(
+//! &mut self,
+//! src: &mut BytesMut
+//! ) -> Result<Option<Self::Item>, Self::Error> {
+//! if src.len() < 4 {
+//! // Not enough data to read length marker.
+//! return Ok(None);
+//! }
+//!
+//! // Read length marker.
+//! let mut length_bytes = [0u8; 4];
+//! length_bytes.copy_from_slice(&src[..4]);
+//! let length = u32::from_le_bytes(length_bytes) as usize;
+//!
+//! // Check that the length is not too large to avoid a denial of
+//! // service attack where the server runs out of memory.
+//! if length > MAX {
+//! return Err(std::io::Error::new(
+//! std::io::ErrorKind::InvalidData,
+//! format!("Frame of length {} is too large.", length)
+//! ));
+//! }
+//!
+//! if src.len() < 4 + length {
+//! // The full string has not yet arrived.
+//! //
+//! // We reserve more space in the buffer. This is not strictly
+//! // necessary, but is a good idea performance-wise.
+//! src.reserve(4 + length - src.len());
+//!
+//! // We inform the Framed that we need more bytes to form the next
+//! // frame.
+//! return Ok(None);
+//! }
+//!
+//! // Use advance to modify src such that it no longer contains
+//! // this frame.
+//! let data = src[4..4 + length].to_vec();
+//! src.advance(4 + length);
+//!
+//! // Convert the data to a string, or fail if it is not valid utf-8.
+//! match String::from_utf8(data) {
+//! Ok(string) => Ok(Some(string)),
+//! Err(utf8_error) => {
+//! Err(std::io::Error::new(
+//! std::io::ErrorKind::InvalidData,
+//! utf8_error.utf8_error(),
+//! ))
+//! },
+//! }
+//! }
+//! }
+//! ```
+//!
+//! # The Encoder trait
+//!
+//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn
+//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to
+//! specify how frames are turned into a sequences of bytes. The job of the
+//! `FramedWrite` is to take the resulting sequence of bytes and write it to the
+//! IO resource.
+//!
+//! The main method on the `Encoder` trait is the [`encode`] method. This method
+//! takes an item that is being written, and a buffer to write the item to. The
+//! buffer may already contain data, and in this case, the encoder should append
+//! the new frame the to buffer rather than overwrite the existing data.
+//!
+//! It is guaranteed that, from one call to `encode` to another, the provided
+//! buffer will contain the exact same data as before, except that some of the
+//! data may have been removed from the front of the buffer. Writing to a
+//! `FramedWrite` is essentially equivalent to the following loop:
+//!
+//! ```no_run
+//! use tokio::io::AsyncWriteExt;
+//! use bytes::Buf; // for advance
+//! # use tokio_util::codec::Encoder;
+//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
+//! # async fn no_more_frames() { }
+//! # #[tokio::main] async fn main() -> std::io::Result<()> {
+//! # let mut io_resource = tokio::io::sink();
+//! # let mut encoder = tokio_util::codec::BytesCodec::new();
+//!
+//! const MAX: usize = 8192;
+//!
+//! let mut buf = bytes::BytesMut::new();
+//! loop {
+//! tokio::select! {
+//! num_written = io_resource.write(&buf), if !buf.is_empty() => {
+//! buf.advance(num_written?);
+//! },
+//! frame = next_frame(), if buf.len() < MAX => {
+//! encoder.encode(frame, &mut buf)?;
+//! },
+//! _ = no_more_frames() => {
+//! io_resource.write_all(&buf).await?;
+//! io_resource.shutdown().await?;
+//! return Ok(());
+//! },
+//! }
+//! }
+//! # }
+//! ```
+//! Here the `next_frame` method corresponds to any frames you write to the
+//! `FramedWrite`. The `no_more_frames` method corresponds to closing the
+//! `FramedWrite` with [`SinkExt::close`].
+//!
+//! ## Example encoder
+//!
+//! As an example, consider a protocol that can be used to send strings where
+//! each frame is a four byte integer that contains the length of the frame,
+//! followed by that many bytes of string data. The encoder will fail if the
+//! string is too long.
+//!
+//! Such an encoder can be written like this:
+//! ```
+//! use tokio_util::codec::Encoder;
+//! use bytes::BytesMut;
+//!
+//! struct MyStringEncoder {}
+//!
+//! const MAX: usize = 8 * 1024 * 1024;
+//!
+//! impl Encoder<String> for MyStringEncoder {
+//! type Error = std::io::Error;
+//!
+//! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
+//! // Don't send a string if it is longer than the other end will
+//! // accept.
+//! if item.len() > MAX {
+//! return Err(std::io::Error::new(
+//! std::io::ErrorKind::InvalidData,
+//! format!("Frame of length {} is too large.", item.len())
+//! ));
+//! }
+//!
+//! // Convert the length into a byte array.
+//! // The cast to u32 cannot overflow due to the length check above.
+//! let len_slice = u32::to_le_bytes(item.len() as u32);
+//!
+//! // Reserve space in the buffer.
+//! dst.reserve(4 + item.len());
+//!
+//! // Write the length and string to the buffer.
+//! dst.extend_from_slice(&len_slice);
+//! dst.extend_from_slice(item.as_bytes());
+//! Ok(())
+//! }
+//! }
+//! ```
+//!
+//! [`AsyncRead`]: tokio::io::AsyncRead
+//! [`AsyncWrite`]: tokio::io::AsyncWrite
+//! [`Stream`]: futures_core::Stream
+//! [`Sink`]: futures_sink::Sink
+//! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close
+//! [`FramedRead`]: struct@crate::codec::FramedRead
+//! [`FramedWrite`]: struct@crate::codec::FramedWrite
+//! [`Framed`]: struct@crate::codec::Framed
+//! [`Decoder`]: trait@crate::codec::Decoder
+//! [`decode`]: fn@crate::codec::Decoder::decode
+//! [`encode`]: fn@crate::codec::Encoder::encode
+//! [`split_to`]: fn@bytes::BytesMut::split_to
+//! [`advance`]: fn@bytes::Buf::advance
+
+mod bytes_codec;
+pub use self::bytes_codec::BytesCodec;
+
+mod decoder;
+pub use self::decoder::Decoder;
+
+mod encoder;
+pub use self::encoder::Encoder;
+
+mod framed_impl;
+#[allow(unused_imports)]
+pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
+
+mod framed;
+pub use self::framed::{Framed, FramedParts};
+
+mod framed_read;
+pub use self::framed_read::FramedRead;
+
+mod framed_write;
+pub use self::framed_write::FramedWrite;
+
+pub mod length_delimited;
+pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError};
+
+mod lines_codec;
+pub use self::lines_codec::{LinesCodec, LinesCodecError};
+
+mod any_delimiter_codec;
+pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};