//! Adaptors from AsyncRead/AsyncWrite to Stream/Sink //! //! Raw I/O objects work with byte sequences, but higher-level code usually //! wants to batch these into meaningful chunks, called "frames". //! //! This module contains adapters to go from streams of bytes, [`AsyncRead`] and //! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. //! Framed streams are also known as transports. //! //! # The Decoder trait //! //! A [`Decoder`] is used together with [`FramedRead`] or [`Framed`] to turn an //! [`AsyncRead`] into a [`Stream`]. The job of the decoder trait is to specify //! how sequences of bytes are turned into a sequence of frames, and to //! determine where the boundaries between frames are. The job of the //! `FramedRead` is to repeatedly switch between reading more data from the IO //! resource, and asking the decoder whether we have received enough data to //! decode another frame of data. //! //! The main method on the `Decoder` trait is the [`decode`] method. This method //! takes as argument the data that has been read so far, and when it is called, //! it will be in one of the following situations: //! //! 1. The buffer contains less than a full frame. //! 2. The buffer contains exactly a full frame. //! 3. The buffer contains more than a full frame. //! //! In the first situation, the decoder should return `Ok(None)`. //! //! In the second situation, the decoder should clear the provided buffer and //! return `Ok(Some(the_decoded_frame))`. //! //! In the third situation, the decoder should use a method such as [`split_to`] //! or [`advance`] to modify the buffer such that the frame is removed from the //! buffer, but any data in the buffer after that frame should still remain in //! the buffer. The decoder should also return `Ok(Some(the_decoded_frame))` in //! this case. //! //! Finally the decoder may return an error if the data is invalid in some way. //! The decoder should _not_ return an error just because it has yet to receive //! a full frame. //! //! It is guaranteed that, from one call to `decode` to another, the provided //! buffer will contain the exact same data as before, except that if more data //! has arrived through the IO resource, that data will have been appended to //! the buffer. This means that reading frames from a `FramedRead` is //! essentially equivalent to the following loop: //! //! ```no_run //! use tokio::io::AsyncReadExt; //! # // This uses async_stream to create an example that compiles. //! # fn foo() -> impl futures_core::Stream> { async_stream::try_stream! { //! # use tokio_util::codec::Decoder; //! # let mut decoder = tokio_util::codec::BytesCodec::new(); //! # let io_resource = &mut &[0u8, 1, 2, 3][..]; //! //! let mut buf = bytes::BytesMut::new(); //! loop { //! // The read_buf call will append to buf rather than overwrite existing data. //! let len = io_resource.read_buf(&mut buf).await?; //! //! if len == 0 { //! while let Some(frame) = decoder.decode_eof(&mut buf)? { //! yield frame; //! } //! break; //! } //! //! while let Some(frame) = decoder.decode(&mut buf)? { //! yield frame; //! } //! } //! # }} //! ``` //! The example above uses `yield` whenever the `Stream` produces an item. //! //! ## Example decoder //! //! As an example, consider a protocol that can be used to send strings where //! each frame is a four byte integer that contains the length of the frame, //! followed by that many bytes of string data. The decoder fails with an error //! if the string data is not valid utf-8 or too long. //! //! Such a decoder can be written like this: //! ``` //! use tokio_util::codec::Decoder; //! use bytes::{BytesMut, Buf}; //! //! struct MyStringDecoder {} //! //! const MAX: usize = 8 * 1024 * 1024; //! //! impl Decoder for MyStringDecoder { //! type Item = String; //! type Error = std::io::Error; //! //! fn decode( //! &mut self, //! src: &mut BytesMut //! ) -> Result, Self::Error> { //! if src.len() < 4 { //! // Not enough data to read length marker. //! return Ok(None); //! } //! //! // Read length marker. //! let mut length_bytes = [0u8; 4]; //! length_bytes.copy_from_slice(&src[..4]); //! let length = u32::from_le_bytes(length_bytes) as usize; //! //! // Check that the length is not too large to avoid a denial of //! // service attack where the server runs out of memory. //! if length > MAX { //! return Err(std::io::Error::new( //! std::io::ErrorKind::InvalidData, //! format!("Frame of length {} is too large.", length) //! )); //! } //! //! if src.len() < 4 + length { //! // The full string has not yet arrived. //! // //! // We reserve more space in the buffer. This is not strictly //! // necessary, but is a good idea performance-wise. //! src.reserve(4 + length - src.len()); //! //! // We inform the Framed that we need more bytes to form the next //! // frame. //! return Ok(None); //! } //! //! // Use advance to modify src such that it no longer contains //! // this frame. //! let data = src[4..4 + length].to_vec(); //! src.advance(4 + length); //! //! // Convert the data to a string, or fail if it is not valid utf-8. //! match String::from_utf8(data) { //! Ok(string) => Ok(Some(string)), //! Err(utf8_error) => { //! Err(std::io::Error::new( //! std::io::ErrorKind::InvalidData, //! utf8_error.utf8_error(), //! )) //! }, //! } //! } //! } //! ``` //! //! # The Encoder trait //! //! An [`Encoder`] is used together with [`FramedWrite`] or [`Framed`] to turn //! an [`AsyncWrite`] into a [`Sink`]. The job of the encoder trait is to //! specify how frames are turned into a sequences of bytes. The job of the //! `FramedWrite` is to take the resulting sequence of bytes and write it to the //! IO resource. //! //! The main method on the `Encoder` trait is the [`encode`] method. This method //! takes an item that is being written, and a buffer to write the item to. The //! buffer may already contain data, and in this case, the encoder should append //! the new frame the to buffer rather than overwrite the existing data. //! //! It is guaranteed that, from one call to `encode` to another, the provided //! buffer will contain the exact same data as before, except that some of the //! data may have been removed from the front of the buffer. Writing to a //! `FramedWrite` is essentially equivalent to the following loop: //! //! ```no_run //! use tokio::io::AsyncWriteExt; //! use bytes::Buf; // for advance //! # use tokio_util::codec::Encoder; //! # async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() } //! # async fn no_more_frames() { } //! # #[tokio::main] async fn main() -> std::io::Result<()> { //! # let mut io_resource = tokio::io::sink(); //! # let mut encoder = tokio_util::codec::BytesCodec::new(); //! //! const MAX: usize = 8192; //! //! let mut buf = bytes::BytesMut::new(); //! loop { //! tokio::select! { //! num_written = io_resource.write(&buf), if !buf.is_empty() => { //! buf.advance(num_written?); //! }, //! frame = next_frame(), if buf.len() < MAX => { //! encoder.encode(frame, &mut buf)?; //! }, //! _ = no_more_frames() => { //! io_resource.write_all(&buf).await?; //! io_resource.shutdown().await?; //! return Ok(()); //! }, //! } //! } //! # } //! ``` //! Here the `next_frame` method corresponds to any frames you write to the //! `FramedWrite`. The `no_more_frames` method corresponds to closing the //! `FramedWrite` with [`SinkExt::close`]. //! //! ## Example encoder //! //! As an example, consider a protocol that can be used to send strings where //! each frame is a four byte integer that contains the length of the frame, //! followed by that many bytes of string data. The encoder will fail if the //! string is too long. //! //! Such an encoder can be written like this: //! ``` //! use tokio_util::codec::Encoder; //! use bytes::BytesMut; //! //! struct MyStringEncoder {} //! //! const MAX: usize = 8 * 1024 * 1024; //! //! impl Encoder for MyStringEncoder { //! type Error = std::io::Error; //! //! fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> { //! // Don't send a string if it is longer than the other end will //! // accept. //! if item.len() > MAX { //! return Err(std::io::Error::new( //! std::io::ErrorKind::InvalidData, //! format!("Frame of length {} is too large.", item.len()) //! )); //! } //! //! // Convert the length into a byte array. //! // The cast to u32 cannot overflow due to the length check above. //! let len_slice = u32::to_le_bytes(item.len() as u32); //! //! // Reserve space in the buffer. //! dst.reserve(4 + item.len()); //! //! // Write the length and string to the buffer. //! dst.extend_from_slice(&len_slice); //! dst.extend_from_slice(item.as_bytes()); //! Ok(()) //! } //! } //! ``` //! //! [`AsyncRead`]: tokio::io::AsyncRead //! [`AsyncWrite`]: tokio::io::AsyncWrite //! [`Stream`]: futures_core::Stream //! [`Sink`]: futures_sink::Sink //! [`SinkExt::close`]: https://docs.rs/futures/0.3/futures/sink/trait.SinkExt.html#method.close //! [`FramedRead`]: struct@crate::codec::FramedRead //! [`FramedWrite`]: struct@crate::codec::FramedWrite //! [`Framed`]: struct@crate::codec::Framed //! [`Decoder`]: trait@crate::codec::Decoder //! [`decode`]: fn@crate::codec::Decoder::decode //! [`encode`]: fn@crate::codec::Encoder::encode //! [`split_to`]: fn@bytes::BytesMut::split_to //! [`advance`]: fn@bytes::Buf::advance mod bytes_codec; pub use self::bytes_codec::BytesCodec; mod decoder; pub use self::decoder::Decoder; mod encoder; pub use self::encoder::Encoder; mod framed_impl; #[allow(unused_imports)] pub(crate) use self::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; mod framed; pub use self::framed::{Framed, FramedParts}; mod framed_read; pub use self::framed_read::FramedRead; mod framed_write; pub use self::framed_write::FramedWrite; pub mod length_delimited; pub use self::length_delimited::{LengthDelimitedCodec, LengthDelimitedCodecError}; mod lines_codec; pub use self::lines_codec::{LinesCodec, LinesCodecError}; mod any_delimiter_codec; pub use self::any_delimiter_codec::{AnyDelimiterCodec, AnyDelimiterCodecError};