#![allow(deprecated)] use {codec, AsyncRead, AsyncWrite}; use bytes::{Buf, BufMut, BytesMut, IntoBuf}; use bytes::buf::Chain; use futures::{Async, AsyncSink, Stream, Sink, StartSend, Poll}; use std::{cmp, fmt}; use std::error::Error as StdError; use std::io::{self, Cursor}; /// Configure length delimited `FramedRead`, `FramedWrite`, and `Framed` values. /// /// `Builder` enables constructing configured length delimited framers. Note /// that not all configuration settings apply to both encoding and decoding. See /// the documentation for specific methods for more detail. #[derive(Debug, Clone, Copy)] pub struct Builder { // Maximum frame length max_frame_len: usize, // Number of bytes representing the field length length_field_len: usize, // Number of bytes in the header before the length field length_field_offset: usize, // Adjust the length specified in the header field by this amount length_adjustment: isize, // Total number of bytes to skip before reading the payload, if not set, // `length_field_len + length_field_offset` num_skip: Option, // Length field byte order (little or big endian) length_field_is_big_endian: bool, } /// Adapts a byte stream into a unified `Stream` and `Sink` that works over /// entire frame values. /// /// See [module level] documentation for more detail. /// /// [module level]: index.html pub struct Framed { inner: FramedRead>, } /// Adapts a byte stream to a `Stream` yielding entire frame values. /// /// See [module level] documentation for more detail. /// /// [module level]: index.html #[derive(Debug)] pub struct FramedRead { inner: codec::FramedRead, } /// An error when the number of bytes read is more than max frame length. pub struct FrameTooBig { _priv: (), } #[derive(Debug)] struct Decoder { // Configuration values builder: Builder, // Read state state: DecodeState, } #[derive(Debug, Clone, Copy)] enum DecodeState { Head, Data(usize), } /// Adapts a byte stream to a `Sink` accepting entire frame values. /// /// See [module level] documentation for more detail. /// /// [module level]: index.html pub struct FramedWrite { // I/O type inner: T, // Configuration values builder: Builder, // Current frame being written frame: Option, B::Buf>>, } // ===== impl Framed ===== impl Framed { /// Creates a new `Framed` with default configuration values. pub fn new(inner: T) -> Framed { Builder::new().new_framed(inner) } } impl Framed { /// Returns a reference to the underlying I/O stream wrapped by `Framed`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { self.inner.get_ref().get_ref() } /// Returns a mutable reference to the underlying I/O stream wrapped by /// `Framed`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn get_mut(&mut self) -> &mut T { self.inner.get_mut().get_mut() } /// Consumes the `Framed`, returning its underlying I/O stream. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn into_inner(self) -> T { self.inner.into_inner().into_inner() } } impl Stream for Framed { type Item = BytesMut; type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { self.inner.poll() } } impl Sink for Framed { type SinkItem = B; type SinkError = io::Error; fn start_send(&mut self, item: B) -> StartSend { self.inner.start_send(item) } fn poll_complete(&mut self) -> Poll<(), io::Error> { self.inner.poll_complete() } fn close(&mut self) -> Poll<(), io::Error> { self.inner.close() } } impl fmt::Debug for Framed where T: fmt::Debug, B::Buf: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Framed") .field("inner", &self.inner) .finish() } } // ===== impl FramedRead ===== impl FramedRead { /// Creates a new `FramedRead` with default configuration values. pub fn new(inner: T) -> FramedRead { Builder::new().new_read(inner) } } impl FramedRead { /// Returns the current max frame setting /// /// This is the largest size this codec will accept from the wire. Larger /// frames will be rejected. pub fn max_frame_length(&self) -> usize { self.inner.decoder().builder.max_frame_len } /// Updates the max frame setting. /// /// The change takes effect the next time a frame is decoded. In other /// words, if a frame is currently in process of being decoded with a frame /// size greater than `val` but less than the max frame length in effect /// before calling this function, then the frame will be allowed. pub fn set_max_frame_length(&mut self, val: usize) { self.inner.decoder_mut().builder.max_frame_length(val); } /// Returns a reference to the underlying I/O stream wrapped by `FramedRead`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { self.inner.get_ref() } /// Returns a mutable reference to the underlying I/O stream wrapped by /// `FramedRead`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn get_mut(&mut self) -> &mut T { self.inner.get_mut() } /// Consumes the `FramedRead`, returning its underlying I/O stream. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn into_inner(self) -> T { self.inner.into_inner() } } impl Stream for FramedRead { type Item = BytesMut; type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { self.inner.poll() } } impl Sink for FramedRead { type SinkItem = T::SinkItem; type SinkError = T::SinkError; fn start_send(&mut self, item: T::SinkItem) -> StartSend { self.inner.start_send(item) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { self.inner.poll_complete() } fn close(&mut self) -> Poll<(), T::SinkError> { self.inner.close() } } impl io::Write for FramedRead { fn write(&mut self, src: &[u8]) -> io::Result { self.inner.get_mut().write(src) } fn flush(&mut self) -> io::Result<()> { self.inner.get_mut().flush() } } impl AsyncWrite for FramedRead { fn shutdown(&mut self) -> Poll<(), io::Error> { self.inner.get_mut().shutdown() } fn write_buf(&mut self, buf: &mut B) -> Poll { self.inner.get_mut().write_buf(buf) } } // ===== impl Decoder ====== impl Decoder { fn decode_head(&mut self, src: &mut BytesMut) -> io::Result> { let head_len = self.builder.num_head_bytes(); let field_len = self.builder.length_field_len; if src.len() < head_len { // Not enough data return Ok(None); } let n = { let mut src = Cursor::new(&mut *src); // Skip the required bytes src.advance(self.builder.length_field_offset); // match endianess let n = if self.builder.length_field_is_big_endian { src.get_uint_be(field_len) } else { src.get_uint_le(field_len) }; if n > self.builder.max_frame_len as u64 { return Err(io::Error::new(io::ErrorKind::InvalidData, FrameTooBig { _priv: (), })); } // The check above ensures there is no overflow let n = n as usize; // Adjust `n` with bounds checking let n = if self.builder.length_adjustment < 0 { n.checked_sub(-self.builder.length_adjustment as usize) } else { n.checked_add(self.builder.length_adjustment as usize) }; // Error handling match n { Some(n) => n, None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), } }; let num_skip = self.builder.get_num_skip(); if num_skip > 0 { let _ = src.split_to(num_skip); } // Ensure that the buffer has enough space to read the incoming // payload src.reserve(n); return Ok(Some(n)); } fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result> { // At this point, the buffer has already had the required capacity // reserved. All there is to do is read. if src.len() < n { return Ok(None); } Ok(Some(src.split_to(n))) } } impl codec::Decoder for Decoder { type Item = BytesMut; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> io::Result> { let n = match self.state { DecodeState::Head => { match try!(self.decode_head(src)) { Some(n) => { self.state = DecodeState::Data(n); n } None => return Ok(None), } } DecodeState::Data(n) => n, }; match try!(self.decode_data(n, src)) { Some(data) => { // Update the decode state self.state = DecodeState::Head; // Make sure the buffer has enough space to read the next head src.reserve(self.builder.num_head_bytes()); Ok(Some(data)) } None => Ok(None), } } } // ===== impl FramedWrite ===== impl FramedWrite { /// Creates a new `FramedWrite` with default configuration values. pub fn new(inner: T) -> FramedWrite { Builder::new().new_write(inner) } } impl FramedWrite { /// Returns the current max frame setting /// /// This is the largest size this codec will write to the wire. Larger /// frames will be rejected. pub fn max_frame_length(&self) -> usize { self.builder.max_frame_len } /// Updates the max frame setting. /// /// The change takes effect the next time a frame is encoded. In other /// words, if a frame is currently in process of being encoded with a frame /// size greater than `val` but less than the max frame length in effect /// before calling this function, then the frame will be allowed. pub fn set_max_frame_length(&mut self, val: usize) { self.builder.max_frame_length(val); } /// Returns a reference to the underlying I/O stream wrapped by /// `FramedWrite`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise /// being worked with. pub fn get_ref(&self) -> &T { &self.inner } /// Returns a mutable reference to the underlying I/O stream wrapped by /// `FramedWrite`. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn get_mut(&mut self) -> &mut T { &mut self.inner } /// Consumes the `FramedWrite`, returning its underlying I/O stream. /// /// Note that care should be taken to not tamper with the underlying stream /// of data coming in as it may corrupt the stream of frames otherwise being /// worked with. pub fn into_inner(self) -> T { self.inner } } impl FramedWrite { // If there is a buffered frame, try to write it to `T` fn do_write(&mut self) -> Poll<(), io::Error> { if self.frame.is_none() { return Ok(Async::Ready(())); } loop { let frame = self.frame.as_mut().unwrap(); try_ready!(self.inner.write_buf(frame)); if !frame.has_remaining() { break; } } self.frame = None; Ok(Async::Ready(())) } fn set_frame(&mut self, buf: B::Buf) -> io::Result<()> { let mut head = BytesMut::with_capacity(8); let n = buf.remaining(); if n > self.builder.max_frame_len { return Err(io::Error::new(io::ErrorKind::InvalidInput, FrameTooBig { _priv: (), })); } // Adjust `n` with bounds checking let n = if self.builder.length_adjustment < 0 { n.checked_add(-self.builder.length_adjustment as usize) } else { n.checked_sub(self.builder.length_adjustment as usize) }; // Error handling let n = match n { Some(n) => n, None => return Err(io::Error::new(io::ErrorKind::InvalidInput, "provided length would overflow after adjustment")), }; if self.builder.length_field_is_big_endian { head.put_uint_be(n as u64, self.builder.length_field_len); } else { head.put_uint_le(n as u64, self.builder.length_field_len); } debug_assert!(self.frame.is_none()); self.frame = Some(head.into_buf().chain(buf)); Ok(()) } } impl Sink for FramedWrite { type SinkItem = B; type SinkError = io::Error; fn start_send(&mut self, item: B) -> StartSend { if !try!(self.do_write()).is_ready() { return Ok(AsyncSink::NotReady(item)); } try!(self.set_frame(item.into_buf())); Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), io::Error> { // Write any buffered frame to T try_ready!(self.do_write()); // Try flushing the underlying IO try_ready!(self.inner.poll_flush()); return Ok(Async::Ready(())); } fn close(&mut self) -> Poll<(), io::Error> { try_ready!(self.poll_complete()); self.inner.shutdown() } } impl Stream for FramedWrite { type Item = T::Item; type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { self.inner.poll() } } impl io::Read for FramedWrite { fn read(&mut self, dst: &mut [u8]) -> io::Result { self.get_mut().read(dst) } } impl AsyncRead for FramedWrite { fn read_buf(&mut self, buf: &mut B) -> Poll { self.get_mut().read_buf(buf) } unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.get_ref().prepare_uninitialized_buffer(buf) } } impl fmt::Debug for FramedWrite where T: fmt::Debug, B::Buf: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FramedWrite") .field("inner", &self.inner) .field("builder", &self.builder) .field("frame", &self.frame) .finish() } } // ===== impl Builder ===== impl Builder { /// Creates a new length delimited framer builder with default configuration /// values. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .length_field_offset(0) /// .length_field_length(2) /// .length_adjustment(0) /// .num_skip(0) /// .new_read(io); /// # } /// ``` pub fn new() -> Builder { Builder { // Default max frame length of 8MB max_frame_len: 8 * 1_024 * 1_024, // Default byte length of 4 length_field_len: 4, // Default to the header field being at the start of the header. length_field_offset: 0, length_adjustment: 0, // Total number of bytes to skip before reading the payload, if not set, // `length_field_len + length_field_offset` num_skip: None, // Default to reading the length field in network (big) endian. length_field_is_big_endian: true, } } /// Read the length field as a big endian integer /// /// This is the default setting. /// /// This configuration option applies to both encoding and decoding. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .big_endian() /// .new_read(io); /// # } /// ``` pub fn big_endian(&mut self) -> &mut Self { self.length_field_is_big_endian = true; self } /// Read the length field as a little endian integer /// /// The default setting is big endian. /// /// This configuration option applies to both encoding and decoding. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .little_endian() /// .new_read(io); /// # } /// ``` pub fn little_endian(&mut self) -> &mut Self { self.length_field_is_big_endian = false; self } /// Read the length field as a native endian integer /// /// The default setting is big endian. /// /// This configuration option applies to both encoding and decoding. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .native_endian() /// .new_read(io); /// # } /// ``` pub fn native_endian(&mut self) -> &mut Self { if cfg!(target_endian = "big") { self.big_endian() } else { self.little_endian() } } /// Sets the max frame length /// /// This configuration option applies to both encoding and decoding. The /// default value is 8MB. /// /// When decoding, the length field read from the byte stream is checked /// against this setting **before** any adjustments are applied. When /// encoding, the length of the submitted payload is checked against this /// setting. /// /// When frames exceed the max length, an `io::Error` with the custom value /// of the `FrameTooBig` type will be returned. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .max_frame_length(8 * 1024) /// .new_read(io); /// # } /// ``` pub fn max_frame_length(&mut self, val: usize) -> &mut Self { self.max_frame_len = val; self } /// Sets the number of bytes used to represent the length field /// /// The default value is `4`. The max value is `8`. /// /// This configuration option applies to both encoding and decoding. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .length_field_length(4) /// .new_read(io); /// # } /// ``` pub fn length_field_length(&mut self, val: usize) -> &mut Self { assert!(val > 0 && val <= 8, "invalid length field length"); self.length_field_len = val; self } /// Sets the number of bytes in the header before the length field /// /// This configuration option only applies to decoding. /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .length_field_offset(1) /// .new_read(io); /// # } /// ``` pub fn length_field_offset(&mut self, val: usize) -> &mut Self { self.length_field_offset = val; self } /// Delta between the payload length specified in the header and the real /// payload length /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .length_adjustment(-2) /// .new_read(io); /// # } /// ``` pub fn length_adjustment(&mut self, val: isize) -> &mut Self { self.length_adjustment = val; self } /// Sets the number of bytes to skip before reading the payload /// /// Default value is `length_field_len + length_field_offset` /// /// This configuration option only applies to decoding /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .num_skip(4) /// .new_read(io); /// # } /// ``` pub fn num_skip(&mut self, val: usize) -> &mut Self { self.num_skip = Some(val); self } /// Create a configured length delimited `FramedRead` /// /// # Examples /// /// ``` /// # use tokio_io::AsyncRead; /// use tokio_io::codec::length_delimited::Builder; /// /// # fn bind_read(io: T) { /// Builder::new() /// .length_field_offset(0) /// .length_field_length(2) /// .length_adjustment(0) /// .num_skip(0) /// .new_read(io); /// # } /// ``` pub fn new_read(&self, upstream: T) -> FramedRead where T: AsyncRead, { FramedRead { inner: codec::FramedRead::new(upstream, Decoder { builder: *self, state: DecodeState::Head, }), } } /// Create a configured length delimited `FramedWrite` /// /// # Examples /// /// ``` /// # extern crate tokio_io; /// # extern crate bytes; /// # use tokio_io::AsyncWrite; /// # use tokio_io::codec::length_delimited; /// # use bytes::BytesMut; /// # fn write_frame(io: T) { /// # let _: length_delimited::FramedWrite = /// length_delimited::Builder::new() /// .length_field_length(2) /// .new_write(io); /// # } /// # pub fn main() {} /// ``` pub fn new_write(&self, inner: T) -> FramedWrite where T: AsyncWrite, B: IntoBuf, { FramedWrite { inner: inner, builder: *self, frame: None, } } /// Create a configured length delimited `Framed` /// /// # Examples /// /// ``` /// # extern crate tokio_io; /// # extern crate bytes; /// # use tokio_io::{AsyncRead, AsyncWrite}; /// # use tokio_io::codec::length_delimited; /// # use bytes::BytesMut; /// # fn write_frame(io: T) { /// # let _: length_delimited::Framed = /// length_delimited::Builder::new() /// .length_field_length(2) /// .new_framed(io); /// # } /// # pub fn main() {} /// ``` pub fn new_framed(&self, inner: T) -> Framed where T: AsyncRead + AsyncWrite, B: IntoBuf { let inner = self.new_read(self.new_write(inner)); Framed { inner: inner } } fn num_head_bytes(&self) -> usize { let num = self.length_field_offset + self.length_field_len; cmp::max(num, self.num_skip.unwrap_or(0)) } fn get_num_skip(&self) -> usize { self.num_skip.unwrap_or(self.length_field_offset + self.length_field_len) } } // ===== impl FrameTooBig ===== impl fmt::Debug for FrameTooBig { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FrameTooBig") .finish() } } impl fmt::Display for FrameTooBig { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str(self.description()) } } impl StdError for FrameTooBig { fn description(&self) -> &str { "frame size too big" } }