diff options
Diffstat (limited to 'third_party/rust/tokio-util/src/codec/mod.rs')
-rw-r--r-- | third_party/rust/tokio-util/src/codec/mod.rs | 290 |
1 files changed, 290 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/src/codec/mod.rs b/third_party/rust/tokio-util/src/codec/mod.rs new file mode 100644 index 0000000000..2295176bdc --- /dev/null +++ b/third_party/rust/tokio-util/src/codec/mod.rs @@ -0,0 +1,290 @@ +//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink +//! +//! Raw I/O objects work with byte sequences, but higher-level code usually +//! wants to batch these into meaningful chunks, called "frames". +//! +//! This module contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as transports. +//! +//! # The Decoder trait +//! +//! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an +//! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify +//! how sequences of bytes are turned into a sequence of frames, and to +//! determine where the boundaries between frames are. The job of the +//! `FramedRead` is to repeatedly switch between reading more data from the IO +//! resource, and asking the decoder whether we have received enough data to +//! decode another frame of data. +//! +//! The main method on the `Decoder` trait is the [`decode`] method. This method +//! takes as argument the data that has been read so far, and when it is called, +//! it will be in one of the following situations: +//! +//! 1. The buffer contains less than a full frame. +//! 2. The buffer contains exactly a full frame. +//! 3. The buffer contains more than a full frame. +//! +//! In the first situation, the decoder should return `Ok(None)`. +//! +//! In the second situation, the decoder should clear the provided buffer and +//! return `Ok(Some(the_decoded_frame))`. +//! +//! In the third situation, the decoder should use a method such as [`split_to`] +//! or [`advance`] to modify the buffer such that the frame is removed from the +//! buffer, but any data in the buffer after that frame should still remain in +//! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in +//! this case. +//! +//! Finally the decoder may return an error if the data is invalid in some way. +//! The decoder should _not_ return an error just because it has yet to receive +//! a full frame. +//! +//! It is guaranteed that, from one call to `decode` to another, the provided +//! buffer will contain the exact same data as before, except that if more data +//! has arrived through the IO resource, that data will have been appended to +//! the buffer. This means that reading frames from a `FramedRead` is +//! essentially equivalent to the following loop: +//! +//! ```no_run +//! use tokio::io::AsyncReadExt; +//! # // This uses async_stream to create an example that compiles. +//! # fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! { +//! # use tokio_util::codec::Decoder; +//! # let mut decoder = tokio_util::codec::BytesCodec::new(); +//! # let io_resource = &mut &[0u8, 1, 2, 3][..]; +//! +//! let mut buf = bytes::BytesMut::new(); +//! loop { +//! // The read_buf call will append to buf rather than overwrite existing data. +//! let len = io_resource.read_buf(&mut buf).await?; +//! +//! if len == 0 { +//! while let Some(frame) = decoder.decode_eof(&mut buf)? { +//! yield frame; +//! } +//! break; +//! } +//! +//! while let Some(frame) = decoder.decode(&mut buf)? { +//! yield frame; +//! } +//! } +//! # }} +//! ``` +//! The example above uses `yield` whenever the `Stream` produces an item. +//! +//! ## Example decoder +//! +//! As an example, consider a protocol that can be used to send strings where +//! each frame is a four byte integer that contains the length of the frame, +//! followed by that many bytes of string data. The decoder fails with an error +//! if the string data is not valid utf-8 or too long. +//! +//! Such a decoder can be written like this: +//! ``` +//! use tokio_util::codec::Decoder; +//! use bytes::{BytesMut, Buf}; +//! +//! struct MyStringDecoder {} +//! +//! const MAX: usize = 8 * 1024 * 1024; +//! +//! impl Decoder for MyStringDecoder { +//! type Item = String; +//! type Error = std::io::Error; +//! +//! fn decode( +//! &mut self, +//! src: &mut BytesMut +//! ) -> Result<Option<Self::Item>, Self::Error> { +//! if src.len() < 4 { +//! // Not enough data to read length marker. +//! return Ok(None); +//! } +//! +//! // Read length marker. +//! let mut length_bytes = [0u8; 4]; +//! length_bytes.copy_from_slice(&src[..4]); +//! let length = u32::from_le_bytes(length_bytes) as usize; +//! +//! // Check that the length is not too large to avoid a denial of +//! // service attack where the server runs out of memory. +//! if length > MAX { +//! return Err(std::io::Error::new( +//! std::io::ErrorKind::InvalidData, +//! format!("Frame of length {} is too large.", length) +//! )); +//! } +//! +//! if src.len() < 4 + length { +//! // The full string has not yet arrived. +//! // +//! // We reserve more space in the buffer. This is not strictly +//! // necessary, but is a good idea performance-wise. +//! src.reserve(4 + length - src.len()); +//! +//! // We inform the Framed that we need more bytes to form the next +//! // frame. +//! return Ok(None); +//! } +//! +//! // Use advance to modify src such that it no longer contains +//! // this frame. +//! let data = src[4..4 + length].to_vec(); +//! src.advance(4 + length); +//! +//! // Convert the data to a string, or fail if it is not valid utf-8. +//! match String::from_utf8(data) { +//! Ok(string) => Ok(Some(string)), +//! Err(utf8_error) => { +//! Err(std::io::Error::new( +//! std::io::ErrorKind::InvalidData, +//! utf8_error.utf8_error(), +//! )) +//! }, +//! } +//! } +//! } +//! ``` +//! +//! # The Encoder trait +//! +//! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn +//! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to +//! specify how frames are turned into a sequences of bytes. The job of the +//! `FramedWrite` is to take the resulting sequence of bytes and write it to the +//! IO resource. +//! +//! The main method on the `Encoder` trait is the [`encode`] method. This method +//! takes an item that is being written, and a buffer to write the item to. The +//! buffer may already contain data, and in this case, the encoder should append +//! the new frame the to buffer rather than overwrite the existing data. +//! +//! It is guaranteed that, from one call to `encode` to another, the provided +//! buffer will contain the exact same data as before, except that some of the +//! data may have been removed from the front of the buffer. Writing to a +//! `FramedWrite` is essentially equivalent to the following loop: +//! +//! ```no_run +//! use tokio::io::AsyncWriteExt; +//! use bytes::Buf; // for advance +//! # use tokio_util::codec::Encoder; +//! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } +//! # async fn no_more_frames() { } +//! # #[tokio::main] async fn main() -> std::io::Result<()> { +//! # let mut io_resource = tokio::io::sink(); +//! # let mut encoder = tokio_util::codec::BytesCodec::new(); +//! +//! const MAX: usize = 8192; +//! +//! let mut buf = bytes::BytesMut::new(); +//! loop { +//! tokio::select! { +//! num_written = io_resource.write(&buf), if !buf.is_empty() => { +//! buf.advance(num_written?); +//! }, +//! frame = next_frame(), if buf.len() < MAX => { +//! encoder.encode(frame, &mut buf)?; +//! }, +//! _ = no_more_frames() => { +//! io_resource.write_all(&buf).await?; +//! io_resource.shutdown().await?; +//! return Ok(()); +//! }, +//! } +//! } +//! # } +//! ``` +//! Here the `next_frame` method corresponds to any frames you write to the +//! `FramedWrite`. The `no_more_frames` method corresponds to closing the +//! `FramedWrite` with [`SinkExt::close`]. +//! +//! ## Example encoder +//! +//! As an example, consider a protocol that can be used to send strings where +//! each frame is a four byte integer that contains the length of the frame, +//! followed by that many bytes of string data. The encoder will fail if the +//! string is too long. +//! +//! Such an encoder can be written like this: +//! ``` +//! use tokio_util::codec::Encoder; +//! use bytes::BytesMut; +//! +//! struct MyStringEncoder {} +//! +//! const MAX: usize = 8 * 1024 * 1024; +//! +//! impl Encoder<String> for MyStringEncoder { +//! type Error = std::io::Error; +//! +//! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { +//! // Don't send a string if it is longer than the other end will +//! // accept. +//! if item.len() > MAX { +//! return Err(std::io::Error::new( +//! std::io::ErrorKind::InvalidData, +//! format!("Frame of length {} is too large.", item.len()) +//! )); +//! } +//! +//! // Convert the length into a byte array. +//! // The cast to u32 cannot overflow due to the length check above. +//! let len_slice = u32::to_le_bytes(item.len() as u32); +//! +//! // Reserve space in the buffer. +//! dst.reserve(4 + item.len()); +//! +//! // Write the length and string to the buffer. +//! dst.extend_from_slice(&len_slice); +//! dst.extend_from_slice(item.as_bytes()); +//! Ok(()) +//! } +//! } +//! ``` +//! +//! [`AsyncRead`]: tokio::io::AsyncRead +//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Stream`]: futures_core::Stream +//! [`Sink`]: futures_sink::Sink +//! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close +//! [`FramedRead`]: struct@crate::codec::FramedRead +//! [`FramedWrite`]: struct@crate::codec::FramedWrite +//! [`Framed`]: struct@crate::codec::Framed +//! [`Decoder`]: trait@crate::codec::Decoder +//! [`decode`]: fn@crate::codec::Decoder::decode +//! [`encode`]: fn@crate::codec::Encoder::encode +//! [`split_to`]: fn@bytes::BytesMut::split_to +//! [`advance`]: fn@bytes::Buf::advance + +mod bytes_codec; +pub use self::bytes_codec::BytesCodec; + +mod decoder; +pub use self::decoder::Decoder; + +mod encoder; +pub use self::encoder::Encoder; + +mod framed_impl; +#[allow(unused_imports)] +pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; + +mod framed; +pub use self::framed::{Framed, FramedParts}; + +mod framed_read; +pub use self::framed_read::FramedRead; + +mod framed_write; +pub use self::framed_write::FramedWrite; + +pub mod length_delimited; +pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; + +mod lines_codec; +pub use self::lines_codec::{LinesCodec, LinesCodecError}; + +mod any_delimiter_codec; +pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError}; |