extern crate tokio_codec; extern crate tokio_io; extern crate bytes; extern crate futures; use tokio_io::AsyncRead; use tokio_codec::{FramedRead, Decoder}; use bytes::{BytesMut, Buf, IntoBuf, BigEndian}; use futures::Stream; use futures::Async::{Ready, NotReady}; use std::io::{self, Read}; use std::collections::VecDeque; macro_rules! mock { ($($x:expr,)*) => {{ let mut v = VecDeque::new(); v.extend(vec![$($x),*]); Mock { calls: v } }}; } struct U32Decoder; impl Decoder for U32Decoder { type Item = u32; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { if buf.len() < 4 { return Ok(None); } let n = buf.split_to(4).into_buf().get_u32::(); Ok(Some(n)) } } #[test] fn read_multi_frame_in_packet() { let mock = mock! { Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(Some(1)), framed.poll().unwrap()); assert_eq!(Ready(Some(2)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); } #[test] fn read_multi_frame_across_packets() { let mock = mock! { Ok(b"\x00\x00\x00\x00".to_vec()), Ok(b"\x00\x00\x00\x01".to_vec()), Ok(b"\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(Some(1)), framed.poll().unwrap()); assert_eq!(Ready(Some(2)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); } #[test] fn read_not_ready() { let mock = mock! { Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Ok(b"\x00\x00\x00\x00".to_vec()), Ok(b"\x00\x00\x00\x01".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(NotReady, framed.poll().unwrap()); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(Some(1)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); } #[test] fn read_partial_then_not_ready() { let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(NotReady, framed.poll().unwrap()); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(Some(1)), framed.poll().unwrap()); assert_eq!(Ready(Some(2)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); } #[test] fn read_err() { let mock = mock! { Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); } #[test] fn read_partial_then_err() { let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); } #[test] fn read_partial_would_block_then_err() { let mock = mock! { Ok(b"\x00\x00".to_vec()), Err(io::Error::new(io::ErrorKind::WouldBlock, "")), Err(io::Error::new(io::ErrorKind::Other, "")), }; let mut framed = FramedRead::new(mock, U32Decoder); assert_eq!(NotReady, framed.poll().unwrap()); assert_eq!(io::ErrorKind::Other, framed.poll().unwrap_err().kind()); } #[test] fn huge_size() { let data = [0; 32 * 1024]; let mut framed = FramedRead::new(&data[..], BigDecoder); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); struct BigDecoder; impl Decoder for BigDecoder { type Item = u32; type Error = io::Error; fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { if buf.len() < 32 * 1024 { return Ok(None); } buf.split_to(32 * 1024); Ok(Some(0)) } } } #[test] fn data_remaining_is_error() { let data = [0; 5]; let mut framed = FramedRead::new(&data[..], U32Decoder); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert!(framed.poll().is_err()); } #[test] fn multi_frames_on_eof() { struct MyDecoder(Vec); impl Decoder for MyDecoder { type Item = u32; type Error = io::Error; fn decode(&mut self, _buf: &mut BytesMut) -> io::Result> { unreachable!(); } fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result> { if self.0.is_empty() { return Ok(None); } Ok(Some(self.0.remove(0))) } } let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3])); assert_eq!(Ready(Some(0)), framed.poll().unwrap()); assert_eq!(Ready(Some(1)), framed.poll().unwrap()); assert_eq!(Ready(Some(2)), framed.poll().unwrap()); assert_eq!(Ready(Some(3)), framed.poll().unwrap()); assert_eq!(Ready(None), framed.poll().unwrap()); } // ===== Mock ====== struct Mock { calls: VecDeque>>, } impl Read for Mock { fn read(&mut self, dst: &mut [u8]) -> io::Result { match self.calls.pop_front() { Some(Ok(data)) => { debug_assert!(dst.len() >= data.len()); dst[..data.len()].copy_from_slice(&data[..]); Ok(data.len()) } Some(Err(e)) => Err(e), None => Ok(0), } } } impl AsyncRead for Mock { }