summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-codec/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-codec/tests')
-rw-r--r--third_party/rust/tokio-codec/tests/codecs.rs76
-rw-r--r--third_party/rust/tokio-codec/tests/framed.rs94
-rw-r--r--third_party/rust/tokio-codec/tests/framed_read.rs216
-rw-r--r--third_party/rust/tokio-codec/tests/framed_write.rs134
4 files changed, 520 insertions, 0 deletions
diff --git a/third_party/rust/tokio-codec/tests/codecs.rs b/third_party/rust/tokio-codec/tests/codecs.rs
new file mode 100644
index 0000000000..6359e7c72f
--- /dev/null
+++ b/third_party/rust/tokio-codec/tests/codecs.rs
@@ -0,0 +1,76 @@
+extern crate tokio_codec;
+extern crate bytes;
+
+use bytes::{BytesMut, Bytes, BufMut};
+use tokio_codec::{BytesCodec, LinesCodec, Decoder, Encoder};
+
+#[test]
+fn bytes_decoder() {
+ let mut codec = BytesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.put_slice(b"abc");
+ assert_eq!("abc", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"a");
+ assert_eq!("a", codec.decode(buf).unwrap().unwrap());
+}
+
+#[test]
+fn bytes_encoder() {
+ let mut codec = BytesCodec::new();
+
+ // Default capacity of BytesMut
+ #[cfg(target_pointer_width = "64")]
+ const INLINE_CAP: usize = 4 * 8 - 1;
+ #[cfg(target_pointer_width = "32")]
+ const INLINE_CAP: usize = 4 * 4 - 1;
+
+ let mut buf = BytesMut::new();
+ codec.encode(Bytes::from_static(&[0; INLINE_CAP + 1]), &mut buf).unwrap();
+
+ // Default capacity of Framed Read
+ const INITIAL_CAPACITY: usize = 8 * 1024;
+
+ let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY);
+ codec.encode(Bytes::from_static(&[0; INITIAL_CAPACITY + 1]), &mut buf).unwrap();
+}
+
+#[test]
+fn lines_decoder() {
+ let mut codec = LinesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.reserve(200);
+ buf.put("line 1\nline 2\r\nline 3\n\r\n\r");
+ assert_eq!("line 1", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 3", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+ buf.put("k");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+}
+
+#[test]
+fn lines_encoder() {
+ let mut codec = BytesCodec::new();
+
+ // Default capacity of BytesMut
+ #[cfg(target_pointer_width = "64")]
+ const INLINE_CAP: usize = 4 * 8 - 1;
+ #[cfg(target_pointer_width = "32")]
+ const INLINE_CAP: usize = 4 * 4 - 1;
+
+ let mut buf = BytesMut::new();
+ codec.encode(Bytes::from_static(&[b'a'; INLINE_CAP + 1]), &mut buf).unwrap();
+
+ // Default capacity of Framed Read
+ const INITIAL_CAPACITY: usize = 8 * 1024;
+
+ let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY);
+ codec.encode(Bytes::from_static(&[b'a'; INITIAL_CAPACITY + 1]), &mut buf).unwrap();
+}
diff --git a/third_party/rust/tokio-codec/tests/framed.rs b/third_party/rust/tokio-codec/tests/framed.rs
new file mode 100644
index 0000000000..f7dd9cdf70
--- /dev/null
+++ b/third_party/rust/tokio-codec/tests/framed.rs
@@ -0,0 +1,94 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use futures::{Stream, Future};
+use std::io::{self, Read};
+use tokio_codec::{Framed, FramedParts, Decoder, Encoder};
+use tokio_io::AsyncRead;
+use bytes::{BytesMut, Buf, BufMut, IntoBuf};
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+/// Encode and decode u32 values.
+struct U32Codec;
+
+impl Decoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).into_buf().get_u32_be();
+ Ok(Some(n))
+ }
+}
+
+impl Encoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32_be(item);
+ Ok(())
+ }
+}
+
+/// This value should never be used
+struct DontReadIntoThis;
+
+impl Read for DontReadIntoThis {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "Read into something you weren't supposed to."))
+ }
+}
+
+impl AsyncRead for DontReadIntoThis {}
+
+#[test]
+fn can_read_from_existing_buf() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = vec![0, 0, 0, 42].into();
+
+ let framed = Framed::from_parts(parts);
+
+ let num = framed
+ .into_future()
+ .map(|(first_num, _)| {
+ first_num.unwrap()
+ })
+ .wait()
+ .map_err(|e| e.0)
+ .unwrap();
+
+ assert_eq!(num, 42);
+}
+
+#[test]
+fn external_buf_grows_to_init() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = vec![0, 0, 0, 42].into();
+
+ let framed = Framed::from_parts(parts);
+ let FramedParts { read_buf, .. } = framed.into_parts();
+
+ assert_eq!(read_buf.capacity(), INITIAL_CAPACITY);
+}
+
+#[test]
+fn external_buf_does_not_shrink() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = vec![0; INITIAL_CAPACITY * 2].into();
+
+ let framed = Framed::from_parts(parts);
+ let FramedParts { read_buf, .. } = framed.into_parts();
+
+ assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2);
+}
diff --git a/third_party/rust/tokio-codec/tests/framed_read.rs b/third_party/rust/tokio-codec/tests/framed_read.rs
new file mode 100644
index 0000000000..80dfa5e505
--- /dev/null
+++ b/third_party/rust/tokio-codec/tests/framed_read.rs
@@ -0,0 +1,216 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use tokio_io::AsyncRead;
+use tokio_codec::{FramedRead, Decoder};
+
+use bytes::{BytesMut, Buf, IntoBuf, BigEndian};
+use futures::Stream;
+use futures::Async::{Ready, NotReady};
+
+use std::io::{self, Read};
+use std::collections::VecDeque;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+struct U32Decoder;
+
+impl Decoder for U32Decoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).into_buf().get_u32::<BigEndian>();
+ Ok(Some(n))
+ }
+}
+
+#[test]
+fn read_multi_frame_in_packet() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_multi_frame_across_packets() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ Ok(b"\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_not_ready() {
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_partial_then_not_ready() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+#[test]
+fn read_err() {
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn read_partial_then_err() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn read_partial_would_block_then_err() {
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+
+ let mut framed = FramedRead::new(mock, U32Decoder);
+ assert_eq!(NotReady, framed.poll().unwrap());
+ assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind());
+}
+
+#[test]
+fn huge_size() {
+ let data = [0; 32 * 1024];
+
+ let mut framed = FramedRead::new(&data[..], BigDecoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+
+ struct BigDecoder;
+
+ impl Decoder for BigDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 32 * 1024 {
+ return Ok(None);
+ }
+ buf.split_to(32 * 1024);
+ Ok(Some(0))
+ }
+ }
+}
+
+#[test]
+fn data_remaining_is_error() {
+ let data = [0; 5];
+
+ let mut framed = FramedRead::new(&data[..], U32Decoder);
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert!(framed.poll().is_err());
+}
+
+#[test]
+fn multi_frames_on_eof() {
+ struct MyDecoder(Vec<u32>);
+
+ impl Decoder for MyDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ unreachable!();
+ }
+
+ fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if self.0.is_empty() {
+ return Ok(None);
+ }
+
+ Ok(Some(self.0.remove(0)))
+ }
+ }
+
+ let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
+ assert_eq!(Ready(Some(0)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(1)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(2)), framed.poll().unwrap());
+ assert_eq!(Ready(Some(3)), framed.poll().unwrap());
+ assert_eq!(Ready(None), framed.poll().unwrap());
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl Read for Mock {
+ fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ debug_assert!(dst.len() >= data.len());
+ dst[..data.len()].copy_from_slice(&data[..]);
+ Ok(data.len())
+ }
+ Some(Err(e)) => Err(e),
+ None => Ok(0),
+ }
+ }
+}
+
+impl AsyncRead for Mock {
+}
diff --git a/third_party/rust/tokio-codec/tests/framed_write.rs b/third_party/rust/tokio-codec/tests/framed_write.rs
new file mode 100644
index 0000000000..137fb5be13
--- /dev/null
+++ b/third_party/rust/tokio-codec/tests/framed_write.rs
@@ -0,0 +1,134 @@
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate bytes;
+extern crate futures;
+
+use tokio_io::AsyncWrite;
+use tokio_codec::{Encoder, FramedWrite};
+
+use futures::{Sink, Poll};
+use bytes::{BytesMut, BufMut, BigEndian};
+
+use std::io::{self, Write};
+use std::collections::VecDeque;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+struct U32Encoder;
+
+impl Encoder for U32Encoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32_be(item);
+ Ok(())
+ }
+}
+
+#[test]
+fn write_multi_frame_in_packet() {
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+ assert!(framed.start_send(0).unwrap().is_ready());
+ assert!(framed.start_send(1).unwrap().is_ready());
+ assert!(framed.start_send(2).unwrap().is_ready());
+
+ // Nothing written yet
+ assert_eq!(1, framed.get_ref().calls.len());
+
+ // Flush the writes
+ assert!(framed.poll_complete().unwrap().is_ready());
+
+ assert_eq!(0, framed.get_ref().calls.len());
+}
+
+#[test]
+fn write_hits_backpressure() {
+ const ITER: usize = 2 * 1024;
+
+ let mut mock = mock! {
+ // Block the `ITER`th write
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
+ Ok(b"".to_vec()),
+ };
+
+ for i in 0..(ITER + 1) {
+ let mut b = BytesMut::with_capacity(4);
+ b.put_u32_be(i as u32);
+
+ // Append to the end
+ match mock.calls.back_mut().unwrap() {
+ &mut Ok(ref mut data) => {
+ // Write in 2kb chunks
+ if data.len() < ITER {
+ data.extend_from_slice(&b[..]);
+ continue;
+ }
+ }
+ _ => unreachable!(),
+ }
+
+ // Push a new new chunk
+ mock.calls.push_back(Ok(b[..].to_vec()));
+ }
+
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+
+ for i in 0..ITER {
+ assert!(framed.start_send(i as u32).unwrap().is_ready());
+ }
+
+ // This should reject
+ assert!(!framed.start_send(ITER as u32).unwrap().is_ready());
+
+ // This should succeed and start flushing the buffer.
+ assert!(framed.start_send(ITER as u32).unwrap().is_ready());
+
+ // Flush the rest of the buffer
+ assert!(framed.poll_complete().unwrap().is_ready());
+
+ // Ensure the mock is empty
+ assert_eq!(0, framed.get_ref().calls.len());
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl Write for Mock {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ assert!(src.len() >= data.len());
+ assert_eq!(&data[..], &src[..data.len()]);
+ Ok(data.len())
+ }
+ Some(Err(e)) => Err(e),
+ None => panic!("unexpected write; {:?}", src),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+}