diff options
Diffstat (limited to 'third_party/rust/tokio-codec/tests')
-rw-r--r-- | third_party/rust/tokio-codec/tests/codecs.rs | 76 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed.rs | 94 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed_read.rs | 216 | ||||
-rw-r--r-- | third_party/rust/tokio-codec/tests/framed_write.rs | 134 |
4 files changed, 520 insertions, 0 deletions
diff --git a/third_party/rust/tokio-codec/tests/codecs.rs b/third_party/rust/tokio-codec/tests/codecs.rs new file mode 100644 index 0000000000..6359e7c72f --- /dev/null +++ b/third_party/rust/tokio-codec/tests/codecs.rs @@ -0,0 +1,76 @@ +extern crate tokio_codec; +extern crate bytes; + +use bytes::{BytesMut, Bytes, BufMut}; +use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder}; + +#[test] +fn bytes_decoder() { + let mut codec = BytesCodec::new(); + let buf = &mut BytesMut::new(); + buf.put_slice(b"abc"); + assert_eq!("abc", codec.decode(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + buf.put_slice(b"a"); + assert_eq!("a", codec.decode(buf).unwrap().unwrap()); +} + +#[test] +fn bytes_encoder() { + let mut codec = BytesCodec::new(); + + // Default capacity of BytesMut + #[cfg(target_pointer_width = "64")] + const INLINE_CAP: usize = 4 * 8 - 1; + #[cfg(target_pointer_width = "32")] + const INLINE_CAP: usize = 4 * 4 - 1; + + let mut buf = BytesMut::new(); + codec.encode(Bytes::from_static(&[0; INLINE_CAP + 1]), &mut buf).unwrap(); + + // Default capacity of Framed Read + const INITIAL_CAPACITY: usize = 8 * 1024; + + let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY); + codec.encode(Bytes::from_static(&[0; INITIAL_CAPACITY + 1]), &mut buf).unwrap(); +} + +#[test] +fn lines_decoder() { + let mut codec = LinesCodec::new(); + let buf = &mut BytesMut::new(); + buf.reserve(200); + buf.put("line 1\nline 2\r\nline 3\n\r\n\r"); + assert_eq!("line 1", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 2", codec.decode(buf).unwrap().unwrap()); + assert_eq!("line 3", codec.decode(buf).unwrap().unwrap()); + assert_eq!("", codec.decode(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); + buf.put("k"); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap()); + assert_eq!(None, codec.decode(buf).unwrap()); + assert_eq!(None, codec.decode_eof(buf).unwrap()); +} + +#[test] +fn lines_encoder() { + let mut codec = BytesCodec::new(); + + // Default capacity of BytesMut + #[cfg(target_pointer_width = "64")] + const INLINE_CAP: usize = 4 * 8 - 1; + #[cfg(target_pointer_width = "32")] + const INLINE_CAP: usize = 4 * 4 - 1; + + let mut buf = BytesMut::new(); + codec.encode(Bytes::from_static(&[b'a'; INLINE_CAP + 1]), &mut buf).unwrap(); + + // Default capacity of Framed Read + const INITIAL_CAPACITY: usize = 8 * 1024; + + let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY); + codec.encode(Bytes::from_static(&[b'a'; INITIAL_CAPACITY + 1]), &mut buf).unwrap(); +} diff --git a/third_party/rust/tokio-codec/tests/framed.rs b/third_party/rust/tokio-codec/tests/framed.rs new file mode 100644 index 0000000000..f7dd9cdf70 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed.rs @@ -0,0 +1,94 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use futures::{Stream, Future}; +use std::io::{self, Read}; +use tokio_codec::{Framed, FramedParts, Decoder, Encoder}; +use tokio_io::AsyncRead; +use bytes::{BytesMut, Buf, BufMut, IntoBuf}; + +const INITIAL_CAPACITY: usize = 8 * 1024; + +/// Encode and decode u32 values. +struct U32Codec; + +impl Decoder for U32Codec { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 4 { + return Ok(None); + } + + let n = buf.split_to(4).into_buf().get_u32_be(); + Ok(Some(n)) + } +} + +impl Encoder for U32Codec { + type Item = u32; + type Error = io::Error; + + fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { + // Reserve space + dst.reserve(4); + dst.put_u32_be(item); + Ok(()) + } +} + +/// This value should never be used +struct DontReadIntoThis; + +impl Read for DontReadIntoThis { + fn read(&mut self, _: &mut [u8]) -> io::Result<usize> { + Err(io::Error::new(io::ErrorKind::Other, + "Read into something you weren't supposed to.")) + } +} + +impl AsyncRead for DontReadIntoThis {} + +#[test] +fn can_read_from_existing_buf() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); + + let framed = Framed::from_parts(parts); + + let num = framed + .into_future() + .map(|(first_num, _)| { + first_num.unwrap() + }) + .wait() + .map_err(|e| e.0) + .unwrap(); + + assert_eq!(num, 42); +} + +#[test] +fn external_buf_grows_to_init() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0, 0, 0, 42].into(); + + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY); +} + +#[test] +fn external_buf_does_not_shrink() { + let mut parts = FramedParts::new(DontReadIntoThis, U32Codec); + parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into(); + + let framed = Framed::from_parts(parts); + let FramedParts { read_buf, .. } = framed.into_parts(); + + assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2); +} diff --git a/third_party/rust/tokio-codec/tests/framed_read.rs b/third_party/rust/tokio-codec/tests/framed_read.rs new file mode 100644 index 0000000000..80dfa5e505 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed_read.rs @@ -0,0 +1,216 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use tokio_io::AsyncRead; +use tokio_codec::{FramedRead, Decoder}; + +use bytes::{BytesMut, Buf, IntoBuf, BigEndian}; +use futures::Stream; +use futures::Async::{Ready, NotReady}; + +use std::io::{self, Read}; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +struct U32Decoder; + +impl Decoder for U32Decoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 4 { + return Ok(None); + } + + let n = buf.split_to(4).into_buf().get_u32::<BigEndian>(); + Ok(Some(n)) + } +} + +#[test] +fn read_multi_frame_in_packet() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_multi_frame_across_packets() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00".to_vec()), + Ok(b"\x00\x00\x00\x01".to_vec()), + Ok(b"\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_not_ready() { + let mock = mock! { + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Ok(b"\x00\x00\x00\x00".to_vec()), + Ok(b"\x00\x00\x00\x01".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_partial_then_not_ready() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +#[test] +fn read_err() { + let mock = mock! { + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn read_partial_then_err() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn read_partial_would_block_then_err() { + let mock = mock! { + Ok(b"\x00\x00".to_vec()), + Err(io::Error::new(io::ErrorKind::WouldBlock, "")), + Err(io::Error::new(io::ErrorKind::Other, "")), + }; + + let mut framed = FramedRead::new(mock, U32Decoder); + assert_eq!(NotReady, framed.poll().unwrap()); + assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); +} + +#[test] +fn huge_size() { + let data = [0; 32 * 1024]; + + let mut framed = FramedRead::new(&data[..], BigDecoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); + + struct BigDecoder; + + impl Decoder for BigDecoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> { + if buf.len() < 32 * 1024 { + return Ok(None); + } + buf.split_to(32 * 1024); + Ok(Some(0)) + } + } +} + +#[test] +fn data_remaining_is_error() { + let data = [0; 5]; + + let mut framed = FramedRead::new(&data[..], U32Decoder); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert!(framed.poll().is_err()); +} + +#[test] +fn multi_frames_on_eof() { + struct MyDecoder(Vec<u32>); + + impl Decoder for MyDecoder { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { + unreachable!(); + } + + fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> { + if self.0.is_empty() { + return Ok(None); + } + + Ok(Some(self.0.remove(0))) + } + } + + let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); + assert_eq!(Ready(Some(0)), framed.poll().unwrap()); + assert_eq!(Ready(Some(1)), framed.poll().unwrap()); + assert_eq!(Ready(Some(2)), framed.poll().unwrap()); + assert_eq!(Ready(Some(3)), framed.poll().unwrap()); + assert_eq!(Ready(None), framed.poll().unwrap()); +} + +// ===== Mock ====== + +struct Mock { + calls: VecDeque<io::Result<Vec<u8>>>, +} + +impl Read for Mock { + fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(data)) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ok(data.len()) + } + Some(Err(e)) => Err(e), + None => Ok(0), + } + } +} + +impl AsyncRead for Mock { +} diff --git a/third_party/rust/tokio-codec/tests/framed_write.rs b/third_party/rust/tokio-codec/tests/framed_write.rs new file mode 100644 index 0000000000..137fb5be13 --- /dev/null +++ b/third_party/rust/tokio-codec/tests/framed_write.rs @@ -0,0 +1,134 @@ +extern crate tokio_codec; +extern crate tokio_io; +extern crate bytes; +extern crate futures; + +use tokio_io::AsyncWrite; +use tokio_codec::{Encoder, FramedWrite}; + +use futures::{Sink, Poll}; +use bytes::{BytesMut, BufMut, BigEndian}; + +use std::io::{self, Write}; +use std::collections::VecDeque; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +struct U32Encoder; + +impl Encoder for U32Encoder { + type Item = u32; + type Error = io::Error; + + fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { + // Reserve space + dst.reserve(4); + dst.put_u32_be(item); + Ok(()) + } +} + +#[test] +fn write_multi_frame_in_packet() { + let mock = mock! { + Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), + }; + + let mut framed = FramedWrite::new(mock, U32Encoder); + assert!(framed.start_send(0).unwrap().is_ready()); + assert!(framed.start_send(1).unwrap().is_ready()); + assert!(framed.start_send(2).unwrap().is_ready()); + + // Nothing written yet + assert_eq!(1, framed.get_ref().calls.len()); + + // Flush the writes + assert!(framed.poll_complete().unwrap().is_ready()); + + assert_eq!(0, framed.get_ref().calls.len()); +} + +#[test] +fn write_hits_backpressure() { + const ITER: usize = 2 * 1024; + + let mut mock = mock! { + // Block the `ITER`th write + Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")), + Ok(b"".to_vec()), + }; + + for i in 0..(ITER + 1) { + let mut b = BytesMut::with_capacity(4); + b.put_u32_be(i as u32); + + // Append to the end + match mock.calls.back_mut().unwrap() { + &mut Ok(ref mut data) => { + // Write in 2kb chunks + if data.len() < ITER { + data.extend_from_slice(&b[..]); + continue; + } + } + _ => unreachable!(), + } + + // Push a new new chunk + mock.calls.push_back(Ok(b[..].to_vec())); + } + + let mut framed = FramedWrite::new(mock, U32Encoder); + + for i in 0..ITER { + assert!(framed.start_send(i as u32).unwrap().is_ready()); + } + + // This should reject + assert!(!framed.start_send(ITER as u32).unwrap().is_ready()); + + // This should succeed and start flushing the buffer. + assert!(framed.start_send(ITER as u32).unwrap().is_ready()); + + // Flush the rest of the buffer + assert!(framed.poll_complete().unwrap().is_ready()); + + // Ensure the mock is empty + assert_eq!(0, framed.get_ref().calls.len()); +} + +// ===== Mock ====== + +struct Mock { + calls: VecDeque<io::Result<Vec<u8>>>, +} + +impl Write for Mock { + fn write(&mut self, src: &[u8]) -> io::Result<usize> { + match self.calls.pop_front() { + Some(Ok(data)) => { + assert!(src.len() >= data.len()); + assert_eq!(&data[..], &src[..data.len()]); + Ok(data.len()) + } + Some(Err(e)) => Err(e), + None => panic!("unexpected write; {:?}", src), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncWrite for Mock { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} |