diff options
Diffstat (limited to 'third_party/rust/tokio-codec')
-rw-r--r-- | third_party/rust/tokio-codec/.cargo-checksum.json | 1 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/CHANGELOG.md | 3 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/Cargo.toml | 30 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/LICENSE | 25 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/README.md | 35 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/src/bytes_codec.rs | 37 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/src/lib.rs | 32 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/src/lines_codec.rs | 89 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/codecs.rs | 76 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed.rs | 94 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed_read.rs | 216 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed_write.rs | 134 |
12 files changed, 772 insertions, 0 deletions
diff --git a/third_party/rust/tokio-codec/.cargo-checksum.json b/third_party/rust/tokio-codec/.cargo-checksum.json new file mode 100644 index 0000000000..ccd6df08fb --- /dev/null +++ b/third_party/rust/tokio-codec/.cargo-checksum.json @@ -0,0 +1 @@ +{"files":{"CHANGELOG.md":"c669cb16ddb7527ab1e52cb8d1d41b2b5fe5f212bb804710d6d3697bab380ac4","Cargo.toml":"f54f1b39a7327b1c5479278d0c81f04a1fc5e336f4681efaec867d453ded4b47","LICENSE":"4899c290472c872cf8a1904a60e73ec58a1bc1db2e20bc143aa3d1498be49c96","README.md":"a130c56b3b4c625e1284dcfe17235de0e214635d310a09141ade604f1e15956f","src/bytes_codec.rs":"ad7a52ae6501b98bd6332af537a8fa8c4940f3e4495a8d2fed5cf3585afb0e7b","src/lib.rs":"cfca50711173ef5c0ebed4a281c3a8b77792868bd15b3a9ba0a1fec47638e863","src/lines_codec.rs":"cec96bee040e70a039d6598e6afcc50383922c9e949de2573805e3028cbd5781","tests/codecs.rs":"eef71df1db09a8128d017cef44ed0eb9b82ed232d2fcee61a1b4dfb419728327","tests/framed.rs":"b4b3ba571f3a8c1727aef5773e2f4a68f1cf162955c3984da145e512d1047ad1","tests/framed_read.rs":"4e3558a66acd2e1cbd2d82721d48f10d16104979196d616ac5a1e7c120f0ede1","tests/framed_write.rs":"b7aae09c670678d0d7cd24017b5ffe2ba634cc3371222487381aaf8499bf819d"},"package":"881e9645b81c2ce95fcb799ded2c29ffb9f25ef5bef909089a420e5961dd8ccb"}
\ No newline at end of file diff --git a/third_party/rust/tokio-codec/CHANGELOG.md b/third_party/rust/tokio-codec/CHANGELOG.md new file mode 100644 index 0000000000..5c3ac55860 --- /dev/null +++ b/third_party/rust/tokio-codec/CHANGELOG.md @@ -0,0 +1,3 @@ +# # 0.1.0 (June 13, 2018) + +* Initial release (#353) diff --git a/third_party/rust/tokio-codec/Cargo.toml b/third_party/rust/tokio-codec/Cargo.toml new file mode 100644 index 0000000000..e204ed4e62 --- /dev/null +++ b/third_party/rust/tokio-codec/Cargo.toml @@ -0,0 +1,30 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g. crates.io) dependencies +# +# If you believe there's an error in this file please file an +# issue against the rust-lang/cargo repository. If you're +# editing this file be aware that the upstream Cargo.toml +# will likely look very different (and much more reasonable) + +[package] +name = "tokio-codec" +version = "0.1.0" +authors = ["Carl Lerche <me@carllerche.com>", "Bryan Burgers <bryan@burgers.io>"] +description = "Utilities for encoding and decoding frames.\n" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-codec/0.1" +categories = ["asynchronous"] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +[dependencies.bytes] +version = "0.4.7" + +[dependencies.futures] +version = "0.1.18" + +[dependencies.tokio-io] +version = "0.1.7" diff --git a/third_party/rust/tokio-codec/LICENSE b/third_party/rust/tokio-codec/LICENSE new file mode 100644 index 0000000000..38c1e27b8e --- /dev/null +++ b/third_party/rust/tokio-codec/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Tokio Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/third_party/rust/tokio-codec/README.md b/third_party/rust/tokio-codec/README.md new file mode 100644 index 0000000000..e0c1a3858c --- /dev/null +++ b/third_party/rust/tokio-codec/README.md @@ -0,0 +1,35 @@ +# tokio-codec + +Utilities for encoding and decoding frames. + +[Documentation](https://docs.rs/tokio-codec) + +## Usage + +First, add this to your `Cargo.toml`: + +```toml +[dependencies] +tokio-codec = "0.1" +``` + +Next, add this to your crate: + +```rust +extern crate tokio_codec; +``` + +You can find extensive documentation and examples about how to use this crate +online at [https://tokio.rs](https://tokio.rs). The [API +documentation](https://docs.rs/tokio-codec) is also a great place to get started +for the nitty-gritty. + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tokio by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/third_party/rust/tokio-codec/src/bytes_codec.rs b/third_party/rust/tokio-codec/src/bytes_codec.rs new file mode 100644 index 0000000000..d535aef689 --- /dev/null +++ b/third_party/rust/tokio-codec/src/bytes_codec.rs @@ -0,0 +1,37 @@ +use bytes::{Bytes, BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::io; + +/// A simple `Codec` implementation that just ships bytes around. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct BytesCodec(()); + +impl BytesCodec { + /// Creates a new `BytesCodec` for shipping around raw bytes. + pub fn new() -> BytesCodec { BytesCodec(()) } +} + +impl Decoder for BytesCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } +} + +impl Encoder for BytesCodec { + type Item = Bytes; + type Error = io::Error; + + fn encode(&mut self, data: Bytes, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/third_party/rust/tokio-codec/src/lib.rs b/third_party/rust/tokio-codec/src/lib.rs new file mode 100644 index 0000000000..2b26b542bb --- /dev/null +++ b/third_party/rust/tokio-codec/src/lib.rs @@ -0,0 +1,32 @@ +//! Utilities for encoding and decoding frames. +//! +//! Contains adapters to go from streams of bytes, [`AsyncRead`] and +//! [`AsyncWrite`], to framed streams implementing [`Sink`] and [`Stream`]. +//! Framed streams are also known as [transports]. +//! +//! [`AsyncRead`]: # +//! [`AsyncWrite`]: # +//! [`Sink`]: # +//! [`Stream`]: # +//! [transports]: # + +#![deny(missing_docs, missing_debug_implementations, warnings)] +#![doc(html_root_url = "https://docs.rs/tokio-codec/0.1.0")] + +extern crate bytes; +extern crate tokio_io; + +mod bytes_codec; +mod lines_codec; + +pub use tokio_io::_tokio_codec::{ + Decoder, + Encoder, + Framed, + FramedParts, + FramedRead, + FramedWrite, +}; + +pub use bytes_codec::BytesCodec; +pub use lines_codec::LinesCodec; diff --git a/third_party/rust/tokio-codec/src/lines_codec.rs b/third_party/rust/tokio-codec/src/lines_codec.rs new file mode 100644 index 0000000000..bf4135b8e3 --- /dev/null +++ b/third_party/rust/tokio-codec/src/lines_codec.rs @@ -0,0 +1,89 @@ +use bytes::{BufMut, BytesMut}; +use tokio_io::_tokio_codec::{Encoder, Decoder}; +use std::{io, str}; + +/// A simple `Codec` implementation that splits up data into lines. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct LinesCodec { + // Stored index of the next index to examine for a `\n` character. + // This is used to optimize searching. + // For example, if `decode` was called with `abc`, it would hold `3`, + // because that is the next index to examine. + // The next time `decode` is called with `abcde\n`, the method will + // only look at `de\n` before returning. + next_index: usize, +} + +impl LinesCodec { + /// Returns a `LinesCodec` for splitting up data into lines. + pub fn new() -> LinesCodec { + LinesCodec { next_index: 0 } + } +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf).map_err(|_| + io::Error::new( + io::ErrorKind::InvalidData, + "Unable to decode input as UTF8")) +} + +fn without_carriage_return(s: &[u8]) -> &[u8] { + if let Some(&b'\r') = s.last() { + &s[..s.len() - 1] + } else { + s + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + if let Some(newline_offset) = + buf[self.next_index..].iter().position(|b| *b == b'\n') + { + let newline_index = newline_offset + self.next_index; + let line = buf.split_to(newline_index + 1); + let line = &line[..line.len()-1]; + let line = without_carriage_return(line); + let line = utf8(line)?; + self.next_index = 0; + Ok(Some(line.to_string())) + } else { + self.next_index = buf.len(); + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> { + Ok(match self.decode(buf)? { + Some(frame) => Some(frame), + None => { + // No terminating newline - return remaining data, if any + if buf.is_empty() || buf == &b"\r"[..] { + None + } else { + let line = buf.take(); + let line = without_carriage_return(&line); + let line = utf8(line)?; + self.next_index = 0; + Some(line.to_string()) + } + } + }) + } +} + +impl Encoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(line.len() + 1); + buf.put(line); + buf.put_u8(b'\n'); + Ok(()) + } +} diff --git a/third_party/rust/tokio-codec/tests/codecs.rs b/third_party/rust/tokio-codec/tests/codecs.rs new file mode 100644 index 0000000000..6359e7c72f --- /dev/null +++ b/third_party/rust/tokio-codec/tests/codecs.rs @@ -0,0 +1,76 @@ +extern crate tokio_codec; +extern crate bytes; + +use bytes::{BytesMut, Bytes, BufMut}; +use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder}; + +#[test] +fn bytes_decoder() { + let mut codec = BytesCodec::new(); + let buf = &mut BytesMut::new(); + buf.put_slice(b"abc"); + assert_eq!("abc", codec.decode(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + buf.put_slice(b"a"); + assert_eq!("a", codec.decode(buf).unwrap().unwrap()); +} + +#[test] +fn bytes_encoder() { + let mut codec = BytesCodec::new(); + + // Default capacity of BytesMut + #[cfg(target_pointer_width = "64")] + const INLINE_CAP: usize = 4 * 8 - 1; + #[cfg(target_pointer_width = "32")] + const INLINE_CAP: usize = 4 * 4 - 1; + + let mut buf = BytesMut::new(); + codec.encode(Bytes::from_static(&[0; INLINE_CAP + 1]), &mut buf).unwrap(); + + // Default capacity of Framed Read + const INITIAL_CAPACITY: usize = 8 * 1024; + + let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY); + codec.encode(Bytes::from_static(&[0; INITIAL_CAPACITY + 1]), &mut buf).unwrap(); +} + +#[test] +fn lines_decoder() { + let mut codec = LinesCodec::new(); + let buf = &mut BytesMut::new(); + buf.reserve(200); + buf.put("line 1\nline 2\r\nline 3\n\r\n\r"); + assert_eq!("line 1", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 2", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 3", codec.decode(buf).unwrap().unwrap()); + assert_eq!("", codec.decode(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); + buf.put("k"); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); +} + +#[test] +fn lines_encoder() { + let mut codec = BytesCodec::new(); + + // Default capacity of BytesMut + #[cfg(target_pointer_width = "64")] + const INLINE_CAP: usize = 4 * 8 - 1; + #[cfg(target_pointer_width = "32")] + const INLINE_CAP: usize = 4 * 4 - 1; + + let mut buf = BytesMut::new(); + codec.encode(Bytes::from_static(&[b'a'; INLINE_CAP + 1]), &mut buf).unwrap(); + + // Default capacity of Framed Read + const INITIAL_CAPACITY: usize = 8 * 1024; + + let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY); + codec.encode(Bytes::from_static(&[b'a'; INITIAL_CAPACITY + 1]), &mut buf).unwrap(); +} diff --git a/third_party/rust/tokio-codec/tests/framed.rs b/third_party/rust/tokio-codec/tests/framed.rs new file mode 100644 index 0000000000..f7dd9cdf70 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed.rs @@ -0,0 +1,94 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use futures::{Stream, Future}; +use std::io::{self, Read}; +use tokio_codec::{Framed, FramedParts, Decoder, Encoder}; +use tokio_io::AsyncRead; +use bytes::{BytesMut, Buf, BufMut, IntoBuf}; + +const INITIAL_CAPACITY: usize = 8 * 1024; + +/// Encode and decode u32 values. +struct U32Codec; + +impl Decoder for U32Codec { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 4 { + return Ok(None); + } + + let n = buf.split_to(4).into_buf().get_u32_be(); + Ok(Some(n)) + } +} + +impl Encoder for U32Codec { + type Item = u32; + type Error = io::Error; + + fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { + // Reserve space + dst.reserve(4); + dst.put_u32_be(item); + Ok(()) + } +} + +/// This value should never be used +struct DontReadIntoThis; + +impl Read for DontReadIntoThis { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Err(io::Error::new(io::ErrorKind::Other, + "Read into something you weren't supposed to.")) + } +} + +impl AsyncRead for DontReadIntoThis {} + +#[test] +fn can_read_from_existing_buf() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); + + let framed = Framed::from_parts(parts); + + let num = framed + .into_future() + .map(|(first_num, _)| { + first_num.unwrap() + }) + .wait() + .map_err(|e| e.0) + .unwrap(); + + assert_eq!(num, 42); +} + +#[test] +fn external_buf_grows_to_init() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); + + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY); +} + +#[test] +fn external_buf_does_not_shrink() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into(); + + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2); +} diff --git a/third_party/rust/tokio-codec/tests/framed_read.rs b/third_party/rust/tokio-codec/tests/framed_read.rs new file mode 100644 index 0000000000..80dfa5e505 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed_read.rs @@ -0,0 +1,216 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use tokio_io::AsyncRead; +use tokio_codec::{FramedRead, Decoder}; + +use bytes::{BytesMut, Buf, IntoBuf, BigEndian}; +use futures::Stream; +use futures::Async::{Ready, NotReady}; + +use std::io::{self, Read}; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +struct U32Decoder; + +impl Decoder for U32Decoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 4 { + return Ok(None); + } + + let n = buf.split_to(4).into_buf().get_u32::<BigEndian>(); + Ok(Some(n)) + } +} + +#[test] +fn read_multi_frame_in_packet() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_multi_frame_across_packets() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00".to_vec()), + Ok(b"\x00\x00\x00\x01".to_vec()), + Ok(b"\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_not_ready() { + let mock = mock! { + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Ok(b"\x00\x00\x00\x00".to_vec()), + Ok(b"\x00\x00\x00\x01".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_partial_then_not_ready() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_err() { + let mock = mock! { + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn read_partial_then_err() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn read_partial_would_block_then_err() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn huge_size() { + let data = [0; 32 * 1024]; + + let mut framed = FramedRead::new(&data[..], BigDecoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); + + struct BigDecoder; + + impl Decoder for BigDecoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 32 * 1024 { + return Ok(None); + } + buf.split_to(32 * 1024); + Ok(Some(0)) + } + } +} + +#[test] +fn data_remaining_is_error() { + let data = [0; 5]; + + let mut framed = FramedRead::new(&data[..], U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert!(framed.poll().is_err()); +} + +#[test] +fn multi_frames_on_eof() { + struct MyDecoder(Vec<u32>); + + impl Decoder for MyDecoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { + unreachable!(); + } + + fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { + if self.0.is_empty() { + return Ok(None); + } + + Ok(Some(self.0.remove(0))) + } + } + + let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(Some(3)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +// ===== Mock ====== + +struct Mock { + calls: VecDeque<io::Result<Vec<u8>>>, +} + +impl Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(data)) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock { +} diff --git a/third_party/rust/tokio-codec/tests/framed_write.rs b/third_party/rust/tokio-codec/tests/framed_write.rs new file mode 100644 index 0000000000..137fb5be13 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed_write.rs @@ -0,0 +1,134 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use tokio_io::AsyncWrite; +use tokio_codec::{Encoder, FramedWrite}; + +use futures::{Sink, Poll}; +use bytes::{BytesMut, BufMut, BigEndian}; + +use std::io::{self, Write}; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +struct U32Encoder; + +impl Encoder for U32Encoder { + type Item = u32; + type Error = io::Error; + + fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { + // Reserve space + dst.reserve(4); + dst.put_u32_be(item); + Ok(()) + } +} + +#[test] +fn write_multi_frame_in_packet() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedWrite::new(mock, U32Encoder); + assert!(framed.start_send(0).unwrap().is_ready()); + assert!(framed.start_send(1).unwrap().is_ready()); + assert!(framed.start_send(2).unwrap().is_ready()); + + // Nothing written yet + assert_eq!(1, framed.get_ref().calls.len()); + + // Flush the writes + assert!(framed.poll_complete().unwrap().is_ready()); + + assert_eq!(0, framed.get_ref().calls.len()); +} + +#[test] +fn write_hits_backpressure() { + const ITER: usize = 2 * 1024; + + let mut mock = mock! { + // Block the `ITER`th write + Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")), + Ok(b"".to_vec()), + }; + + for i in 0..(ITER + 1) { + let mut b = BytesMut::with_capacity(4); + b.put_u32_be(i as u32); + + // Append to the end + match mock.calls.back_mut().unwrap() { + &mut Ok(ref mut data) => { + // Write in 2kb chunks + if data.len() < ITER { + data.extend_from_slice(&b[..]); + continue; + } + } + _ => unreachable!(), + } + + // Push a new new chunk + mock.calls.push_back(Ok(b[..].to_vec())); + } + + let mut framed = FramedWrite::new(mock, U32Encoder); + + for i in 0..ITER { + assert!(framed.start_send(i as u32).unwrap().is_ready()); + } + + // This should reject + assert!(!framed.start_send(ITER as u32).unwrap().is_ready()); + + // This should succeed and start flushing the buffer. + assert!(framed.start_send(ITER as u32).unwrap().is_ready()); + + // Flush the rest of the buffer + assert!(framed.poll_complete().unwrap().is_ready()); + + // Ensure the mock is empty + assert_eq!(0, framed.get_ref().calls.len()); +} + +// ===== Mock ====== + +struct Mock { + calls: VecDeque<io::Result<Vec<u8>>>, +} + +impl Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(data)) => { + assert!(src.len() >= data.len()); + assert_eq!(&data[..], &src[..data.len()]); + Ok(data.len()) + } + Some(Err(e)) => Err(e), + None => panic!("unexpected write; {:?}", src), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} |