diff options
Diffstat (limited to 'third_party/rust/tokio-codec/src')
-rw-r--r-- | third_party/rust/tokio-codec/src/bytes_codec.rs | 37 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/src/lib.rs | 32 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/src/lines_codec.rs | 89 |
3 files changed, 158 insertions, 0 deletions
diff --git a/third_party/rust/tokio-codec/src/bytes_codec.rs b/third_party/rust/tokio-codec/src/bytes_codec.rs new file mode 100644 index 0000000000..d535aef689 --- /dev/null +++ b/third_party/rust/tokio-codec/src/bytes_codec.rs @@ -0,0 +1,37 @@ +use bytes::{Bytes, BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { BytesCodec(()) } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/third_party/rust/tokio-codec/src/lib.rs b/third_party/rust/tokio-codec/src/lib.rs new file mode 100644 index 0000000000..2b26b542bb --- /dev/null +++ b/third_party/rust/tokio-codec/src/lib.rs @@ -0,0 +1,32 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +#![deny(missing_docs, missing_debug_implementations, warnings)] +#![doc(html_root_url = "https://docs.rs/tokio-codec/0.1.0")] + +extern crate bytes; +extern crate tokio_io; + +mod bytes_codec; +mod lines_codec; + +pub use tokio_io::_tokio_codec::{ + Decoder, + Encoder, + Framed, + FramedParts, + FramedRead, + FramedWrite, +}; + +pub use bytes_codec::BytesCodec; +pub use lines_codec::LinesCodec; diff --git a/third_party/rust/tokio-codec/src/lines_codec.rs b/third_party/rust/tokio-codec/src/lines_codec.rs new file mode 100644 index 0000000000..bf4135b8e3 --- /dev/null +++ b/third_party/rust/tokio-codec/src/lines_codec.rs @@ -0,0 +1,89 @@ +use bytes::{BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::{io, str}; + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf).map_err(|_| + io::Error::new( + io::ErrorKind::InvalidData, + "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + if let Some(newline_offset) = + buf[self.next_index..].iter().position(|b| *b == b'\n') + { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len()-1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} |