summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-util/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-util/tests')
-rw-r--r--third_party/rust/tokio-util/tests/codecs.rs217
-rw-r--r--third_party/rust/tokio-util/tests/framed.rs97
-rw-r--r--third_party/rust/tokio-util/tests/framed_read.rs295
-rw-r--r--third_party/rust/tokio-util/tests/framed_write.rs173
-rw-r--r--third_party/rust/tokio-util/tests/length_delimited.rs760
-rw-r--r--third_party/rust/tokio-util/tests/udp.rs79
6 files changed, 1621 insertions, 0 deletions
diff --git a/third_party/rust/tokio-util/tests/codecs.rs b/third_party/rust/tokio-util/tests/codecs.rs
new file mode 100644
index 0000000000..d121286657
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/codecs.rs
@@ -0,0 +1,217 @@
+#![warn(rust_2018_idioms)]
+
+use tokio_util::codec::{BytesCodec, Decoder, Encoder, LinesCodec};
+
+use bytes::{BufMut, Bytes, BytesMut};
+
+#[test]
+fn bytes_decoder() {
+ let mut codec = BytesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.put_slice(b"abc");
+ assert_eq!("abc", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"a");
+ assert_eq!("a", codec.decode(buf).unwrap().unwrap());
+}
+
+#[test]
+fn bytes_encoder() {
+ let mut codec = BytesCodec::new();
+
+ // Default capacity of BytesMut
+ #[cfg(target_pointer_width = "64")]
+ const INLINE_CAP: usize = 4 * 8 - 1;
+ #[cfg(target_pointer_width = "32")]
+ const INLINE_CAP: usize = 4 * 4 - 1;
+
+ let mut buf = BytesMut::new();
+ codec
+ .encode(Bytes::from_static(&[0; INLINE_CAP + 1]), &mut buf)
+ .unwrap();
+
+ // Default capacity of Framed Read
+ const INITIAL_CAPACITY: usize = 8 * 1024;
+
+ let mut buf = BytesMut::with_capacity(INITIAL_CAPACITY);
+ codec
+ .encode(Bytes::from_static(&[0; INITIAL_CAPACITY + 1]), &mut buf)
+ .unwrap();
+}
+
+#[test]
+fn lines_decoder() {
+ let mut codec = LinesCodec::new();
+ let buf = &mut BytesMut::new();
+ buf.reserve(200);
+ buf.put_slice(b"line 1\nline 2\r\nline 3\n\r\n\r");
+ assert_eq!("line 1", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("line 3", codec.decode(buf).unwrap().unwrap());
+ assert_eq!("", codec.decode(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+ buf.put_slice(b"k");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!("\rk", codec.decode_eof(buf).unwrap().unwrap());
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+}
+
+#[test]
+fn lines_decoder_max_length() {
+ const MAX_LENGTH: usize = 6;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"line 1 is too long\nline 2\nline 3\r\nline 4\n\r\n\r");
+
+ assert!(codec.decode(buf).is_err());
+
+ let line = codec.decode(buf).unwrap().unwrap();
+ assert!(
+ line.len() <= MAX_LENGTH,
+ "{:?}.len() <= {:?}",
+ line,
+ MAX_LENGTH
+ );
+ assert_eq!("line 2", line);
+
+ assert!(codec.decode(buf).is_err());
+
+ let line = codec.decode(buf).unwrap().unwrap();
+ assert!(
+ line.len() <= MAX_LENGTH,
+ "{:?}.len() <= {:?}",
+ line,
+ MAX_LENGTH
+ );
+ assert_eq!("line 4", line);
+
+ let line = codec.decode(buf).unwrap().unwrap();
+ assert!(
+ line.len() <= MAX_LENGTH,
+ "{:?}.len() <= {:?}",
+ line,
+ MAX_LENGTH
+ );
+ assert_eq!("", line);
+
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+ buf.put_slice(b"k");
+ assert_eq!(None, codec.decode(buf).unwrap());
+
+ let line = codec.decode_eof(buf).unwrap().unwrap();
+ assert!(
+ line.len() <= MAX_LENGTH,
+ "{:?}.len() <= {:?}",
+ line,
+ MAX_LENGTH
+ );
+ assert_eq!("\rk", line);
+
+ assert_eq!(None, codec.decode(buf).unwrap());
+ assert_eq!(None, codec.decode_eof(buf).unwrap());
+
+ // Line that's one character too long. This could cause an out of bounds
+ // error if we peek at the next characters using slice indexing.
+ buf.put_slice(b"aaabbbc");
+ assert!(codec.decode(buf).is_err());
+}
+
+#[test]
+fn lines_decoder_max_length_underrun() {
+ const MAX_LENGTH: usize = 6;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"line ");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"too l");
+ assert!(codec.decode(buf).is_err());
+ buf.put_slice(b"ong\n");
+ assert_eq!(None, codec.decode(buf).unwrap());
+
+ buf.put_slice(b"line 2");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"\n");
+ assert_eq!("line 2", codec.decode(buf).unwrap().unwrap());
+}
+
+#[test]
+fn lines_decoder_max_length_bursts() {
+ const MAX_LENGTH: usize = 10;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"line ");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"too l");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"ong\n");
+ assert!(codec.decode(buf).is_err());
+}
+
+#[test]
+fn lines_decoder_max_length_big_burst() {
+ const MAX_LENGTH: usize = 10;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"line ");
+ assert_eq!(None, codec.decode(buf).unwrap());
+ buf.put_slice(b"too long!\n");
+ assert!(codec.decode(buf).is_err());
+}
+
+#[test]
+fn lines_decoder_max_length_newline_between_decodes() {
+ const MAX_LENGTH: usize = 5;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"hello");
+ assert_eq!(None, codec.decode(buf).unwrap());
+
+ buf.put_slice(b"\nworld");
+ assert_eq!("hello", codec.decode(buf).unwrap().unwrap());
+}
+
+// Regression test for [infinite loop bug](https://github.com/tokio-rs/tokio/issues/1483)
+#[test]
+fn lines_decoder_discard_repeat() {
+ const MAX_LENGTH: usize = 1;
+
+ let mut codec = LinesCodec::new_with_max_length(MAX_LENGTH);
+ let buf = &mut BytesMut::new();
+
+ buf.reserve(200);
+ buf.put_slice(b"aa");
+ assert!(codec.decode(buf).is_err());
+ buf.put_slice(b"a");
+ assert!(codec.decode(buf).is_err());
+}
+
+#[test]
+fn lines_encoder() {
+ let mut codec = LinesCodec::new();
+ let mut buf = BytesMut::new();
+
+ codec.encode(String::from("line 1"), &mut buf).unwrap();
+ assert_eq!("line 1\n", buf);
+
+ codec.encode(String::from("line 2"), &mut buf).unwrap();
+ assert_eq!("line 1\nline 2\n", buf);
+}
diff --git a/third_party/rust/tokio-util/tests/framed.rs b/third_party/rust/tokio-util/tests/framed.rs
new file mode 100644
index 0000000000..b98df7368d
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/framed.rs
@@ -0,0 +1,97 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::prelude::*;
+use tokio_test::assert_ok;
+use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};
+
+use bytes::{Buf, BufMut, BytesMut};
+use futures::StreamExt;
+use std::io::{self, Read};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+const INITIAL_CAPACITY: usize = 8 * 1024;
+
+/// Encode and decode u32 values.
+struct U32Codec;
+
+impl Decoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).get_u32();
+ Ok(Some(n))
+ }
+}
+
+impl Encoder for U32Codec {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32(item);
+ Ok(())
+ }
+}
+
+/// This value should never be used
+struct DontReadIntoThis;
+
+impl Read for DontReadIntoThis {
+ fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Read into something you weren't supposed to.",
+ ))
+ }
+}
+
+impl AsyncRead for DontReadIntoThis {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ _buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ unreachable!()
+ }
+}
+
+#[tokio::test]
+async fn can_read_from_existing_buf() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]);
+
+ let mut framed = Framed::from_parts(parts);
+ let num = assert_ok!(framed.next().await.unwrap());
+
+ assert_eq!(num, 42);
+}
+
+#[test]
+fn external_buf_grows_to_init() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = BytesMut::from(&[0, 0, 0, 42][..]);
+
+ let framed = Framed::from_parts(parts);
+ let FramedParts { read_buf, .. } = framed.into_parts();
+
+ assert_eq!(read_buf.capacity(), INITIAL_CAPACITY);
+}
+
+#[test]
+fn external_buf_does_not_shrink() {
+ let mut parts = FramedParts::new(DontReadIntoThis, U32Codec);
+ parts.read_buf = BytesMut::from(&vec![0; INITIAL_CAPACITY * 2][..]);
+
+ let framed = Framed::from_parts(parts);
+ let FramedParts { read_buf, .. } = framed.into_parts();
+
+ assert_eq!(read_buf.capacity(), INITIAL_CAPACITY * 2);
+}
diff --git a/third_party/rust/tokio-util/tests/framed_read.rs b/third_party/rust/tokio-util/tests/framed_read.rs
new file mode 100644
index 0000000000..06caa0a4d0
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/framed_read.rs
@@ -0,0 +1,295 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::AsyncRead;
+use tokio_test::assert_ready;
+use tokio_test::task;
+use tokio_util::codec::{Decoder, FramedRead};
+
+use bytes::{Buf, BytesMut};
+use futures::Stream;
+use std::collections::VecDeque;
+use std::io;
+use std::pin::Pin;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! assert_read {
+ ($e:expr, $n:expr) => {{
+ let val = assert_ready!($e);
+ assert_eq!(val.unwrap().unwrap(), $n);
+ }};
+}
+
+macro_rules! pin {
+ ($id:ident) => {
+ Pin::new(&mut $id)
+ };
+}
+
+struct U32Decoder;
+
+impl Decoder for U32Decoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 4 {
+ return Ok(None);
+ }
+
+ let n = buf.split_to(4).get_u32();
+ Ok(Some(n))
+ }
+}
+
+#[test]
+fn read_multi_frame_in_packet() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_multi_frame_across_packets() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ Ok(b"\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_not_ready() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00".to_vec()),
+ Ok(b"\x00\x00\x00\x01".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_partial_then_not_ready() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Ok(b"\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+#[test]
+fn read_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn read_partial_then_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn read_partial_would_block_then_err() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00".to_vec()),
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
+ Err(io::Error::new(io::ErrorKind::Other, "")),
+ };
+ let mut framed = FramedRead::new(mock, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert!(pin!(framed).poll_next(cx).is_pending());
+ assert_eq!(
+ io::ErrorKind::Other,
+ assert_ready!(pin!(framed).poll_next(cx))
+ .unwrap()
+ .unwrap_err()
+ .kind()
+ )
+ });
+}
+
+#[test]
+fn huge_size() {
+ let mut task = task::spawn(());
+ let data = [0; 32 * 1024];
+ let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+
+ struct BigDecoder;
+
+ impl Decoder for BigDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if buf.len() < 32 * 1024 {
+ return Ok(None);
+ }
+ buf.split_to(32 * 1024);
+ Ok(Some(0))
+ }
+ }
+}
+
+#[test]
+fn data_remaining_is_error() {
+ let mut task = task::spawn(());
+ let slice = Slice(&[0; 5]);
+ let mut framed = FramedRead::new(slice, U32Decoder);
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).unwrap().is_err());
+ });
+}
+
+#[test]
+fn multi_frames_on_eof() {
+ let mut task = task::spawn(());
+ struct MyDecoder(Vec<u32>);
+
+ impl Decoder for MyDecoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn decode(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ unreachable!();
+ }
+
+ fn decode_eof(&mut self, _buf: &mut BytesMut) -> io::Result<Option<u32>> {
+ if self.0.is_empty() {
+ return Ok(None);
+ }
+
+ Ok(Some(self.0.remove(0)))
+ }
+ }
+
+ let mut framed = FramedRead::new(mock!(), MyDecoder(vec![0, 1, 2, 3]));
+
+ task.enter(|cx, _| {
+ assert_read!(pin!(framed).poll_next(cx), 0);
+ assert_read!(pin!(framed).poll_next(cx), 1);
+ assert_read!(pin!(framed).poll_next(cx), 2);
+ assert_read!(pin!(framed).poll_next(cx), 3);
+ assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
+ });
+}
+
+// ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ use io::ErrorKind::WouldBlock;
+
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ debug_assert!(buf.len() >= data.len());
+ buf[..data.len()].copy_from_slice(&data[..]);
+ Ready(Ok(data.len()))
+ }
+ Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
+ Some(Err(e)) => Ready(Err(e)),
+ None => Ready(Ok(0)),
+ }
+ }
+}
+
+// TODO this newtype is necessary because `&[u8]` does not currently implement `AsyncRead`
+struct Slice<'a>(&'a [u8]);
+
+impl AsyncRead for Slice<'_> {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ Pin::new(&mut self.0).poll_read(cx, buf)
+ }
+}
diff --git a/third_party/rust/tokio-util/tests/framed_write.rs b/third_party/rust/tokio-util/tests/framed_write.rs
new file mode 100644
index 0000000000..706e6792fe
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/framed_write.rs
@@ -0,0 +1,173 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::AsyncWrite;
+use tokio_test::{assert_ready, task};
+use tokio_util::codec::{Encoder, FramedWrite};
+
+use bytes::{BufMut, BytesMut};
+use futures_sink::Sink;
+use std::collections::VecDeque;
+use std::io::{self, Write};
+use std::pin::Pin;
+use std::task::Poll::{Pending, Ready};
+use std::task::{Context, Poll};
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! pin {
+ ($id:ident) => {
+ Pin::new(&mut $id)
+ };
+}
+
+struct U32Encoder;
+
+impl Encoder for U32Encoder {
+ type Item = u32;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
+ // Reserve space
+ dst.reserve(4);
+ dst.put_u32(item);
+ Ok(())
+ }
+}
+
+#[test]
+fn write_multi_frame_in_packet() {
+ let mut task = task::spawn(());
+ let mock = mock! {
+ Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
+ };
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+
+ task.enter(|cx, _| {
+ assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
+ assert!(pin!(framed).start_send(0).is_ok());
+ assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
+ assert!(pin!(framed).start_send(1).is_ok());
+ assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
+ assert!(pin!(framed).start_send(2).is_ok());
+
+ // Nothing written yet
+ assert_eq!(1, framed.get_ref().calls.len());
+
+ // Flush the writes
+ assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());
+
+ assert_eq!(0, framed.get_ref().calls.len());
+ });
+}
+
+#[test]
+fn write_hits_backpressure() {
+ const ITER: usize = 2 * 1024;
+
+ let mut mock = mock! {
+ // Block the `ITER`th write
+ Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
+ Ok(b"".to_vec()),
+ };
+
+ for i in 0..=ITER {
+ let mut b = BytesMut::with_capacity(4);
+ b.put_u32(i as u32);
+
+ // Append to the end
+ match mock.calls.back_mut().unwrap() {
+ &mut Ok(ref mut data) => {
+ // Write in 2kb chunks
+ if data.len() < ITER {
+ data.extend_from_slice(&b[..]);
+ continue;
+ } // else fall through and create a new buffer
+ }
+ _ => unreachable!(),
+ }
+
+ // Push a new new chunk
+ mock.calls.push_back(Ok(b[..].to_vec()));
+ }
+ // 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer
+ assert_eq!(mock.calls.len(), 6);
+
+ let mut task = task::spawn(());
+ let mut framed = FramedWrite::new(mock, U32Encoder);
+ task.enter(|cx, _| {
+ // Send 8KB. This fills up FramedWrite2 buffer
+ for i in 0..ITER {
+ assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
+ assert!(pin!(framed).start_send(i as u32).is_ok());
+ }
+
+ // Now we poll_ready which forces a flush. The mock pops the front message
+ // and decides to block.
+ assert!(pin!(framed).poll_ready(cx).is_pending());
+
+ // We poll again, forcing another flush, which this time succeeds
+ // The whole 8KB buffer is flushed
+ assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok());
+
+ // Send more data. This matches the final message expected by the mock
+ assert!(pin!(framed).start_send(ITER as u32).is_ok());
+
+ // Flush the rest of the buffer
+ assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());
+
+ // Ensure the mock is empty
+ assert_eq!(0, framed.get_ref().calls.len());
+ })
+}
+
+// // ===== Mock ======
+
+struct Mock {
+ calls: VecDeque<io::Result<Vec<u8>>>,
+}
+
+impl Write for Mock {
+ fn write(&mut self, src: &[u8]) -> io::Result<usize> {
+ match self.calls.pop_front() {
+ Some(Ok(data)) => {
+ assert!(src.len() >= data.len());
+ assert_eq!(&data[..], &src[..data.len()]);
+ Ok(data.len())
+ }
+ Some(Err(e)) => Err(e),
+ None => panic!("unexpected write; {:?}", src),
+ }
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ Ok(())
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ match Pin::get_mut(self).write(buf) {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
+ other => Ready(other),
+ }
+ }
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ match Pin::get_mut(self).flush() {
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
+ other => Ready(other),
+ }
+ }
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ unimplemented!()
+ }
+}
diff --git a/third_party/rust/tokio-util/tests/length_delimited.rs b/third_party/rust/tokio-util/tests/length_delimited.rs
new file mode 100644
index 0000000000..6c5199167b
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/length_delimited.rs
@@ -0,0 +1,760 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio_test::task;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+use tokio_util::codec::*;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::{pin_mut, Sink, Stream};
+use std::collections::VecDeque;
+use std::io;
+use std::pin::Pin;
+use std::task::Poll::*;
+use std::task::{Context, Poll};
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! assert_next_eq {
+ ($io:ident, $expect:expr) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => panic!("none"),
+ }
+ });
+ }};
+}
+
+macro_rules! assert_next_pending {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(e))) => panic!("error = {:?}", e),
+ Ready(None) => panic!("done"),
+ Pending => {}
+ });
+ }};
+}
+
+macro_rules! assert_next_err {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(_))) => {}
+ Ready(None) => panic!("done"),
+ Pending => panic!("pending"),
+ });
+ }};
+}
+
+macro_rules! assert_done {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => panic!("value = {:?}", v),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => {}
+ }
+ });
+ }};
+}
+
+#[test]
+fn read_empty_io_yields_nothing() {
+ let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
+ pin_mut!(io);
+
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet_little_endian() {
+ let io = length_delimited::Builder::new()
+ .little_endian()
+ .new_read(mock! {
+ data(b"\x09\x00\x00\x00abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet_native_endian() {
+ let d = if cfg!(target_endian = "big") {
+ b"\x00\x00\x00\x09abcdefghi"
+ } else {
+ b"\x09\x00\x00\x00abcdefghi"
+ };
+ let io = length_delimited::Builder::new()
+ .native_endian()
+ .new_read(mock! {
+ data(d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
+ d.extend_from_slice(b"\x00\x00\x00\x03123");
+ d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
+
+ let io = FramedRead::new(
+ mock! {
+ data(&d),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_multi_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ data(b"\x00\x09abc"),
+ data(b"defghi"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_multi_frame_multi_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ data(b"\x00\x09abc"),
+ data(b"defghi"),
+ data(b"\x00\x00\x00\x0312"),
+ data(b"3\x00\x00\x00\x0bhello world"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_multi_packet_wait() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09abc"),
+ Pending,
+ data(b"defghi"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_pending!(io);
+ assert_done!(io);
+}
+
+#[test]
+fn read_multi_frame_multi_packet_wait() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09abc"),
+ Pending,
+ data(b"defghi"),
+ Pending,
+ data(b"\x00\x00\x00\x0312"),
+ Pending,
+ data(b"3\x00\x00\x00\x0bhello world"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_next_pending!(io);
+ assert_done!(io);
+}
+
+#[test]
+fn read_incomplete_head() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_incomplete_head_multi() {
+ let io = FramedRead::new(
+ mock! {
+ Pending,
+ data(b"\x00"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_incomplete_payload() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00\x00\x09ab"),
+ Pending,
+ data(b"cd"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_max_frame_len() {
+ let io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_read(mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_update_max_frame_len_at_rest() {
+ let io = length_delimited::Builder::new().new_read(mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ io.decoder_mut().set_max_frame_length(5);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_update_max_frame_len_in_flight() {
+ let io = length_delimited::Builder::new().new_read(mock! {
+ data(b"\x00\x00\x00\x09abcd"),
+ Pending,
+ data(b"efghi"),
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ io.decoder_mut().set_max_frame_length(5);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_one_byte_length_field() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_read(mock! {
+ data(b"\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_header_offset() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(4)
+ .new_read(mock! {
+ data(b"zzzz\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_skip_none_adjusted() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"xx\x00\x09abcdefghi");
+ d.extend_from_slice(b"yy\x00\x03123");
+ d.extend_from_slice(b"zz\x00\x0bhello world");
+
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(2)
+ .num_skip(0)
+ .length_adjustment(4)
+ .new_read(mock! {
+ data(&d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"xx\x00\x09abcdefghi");
+ assert_next_eq!(io, b"yy\x00\x03123");
+ assert_next_eq!(io, b"zz\x00\x0bhello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_length_includes_head() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x0babcdefghi");
+ d.extend_from_slice(b"\x00\x05123");
+ d.extend_from_slice(b"\x00\x0dhello world");
+
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_adjustment(-2)
+ .new_read(mock! {
+ data(&d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn write_single_frame_length_adjusted() {
+ let io = length_delimited::Builder::new()
+ .length_adjustment(-2)
+ .new_write(mock! {
+ data(b"\x00\x00\x00\x0b"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_nothing_yields_nothing() {
+ let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.poll_flush(cx));
+ });
+}
+
+#[test]
+fn write_single_frame_one_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_multi_frame_one_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ data(b"\x00\x00\x00\x03"),
+ data(b"123"),
+ data(b"\x00\x00\x00\x0b"),
+ data(b"hello world"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("123")));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_multi_frame_multi_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ data(b"\x00\x00\x00\x03"),
+ data(b"123"),
+ flush(),
+ data(b"\x00\x00\x00\x0b"),
+ data(b"hello world"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("123")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_would_block() {
+ let io = FramedWrite::new(
+ mock! {
+ Pending,
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_pending!(io.as_mut().poll_flush(cx));
+ assert_pending!(io.as_mut().poll_flush(cx));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_little_endian() {
+ let io = length_delimited::Builder::new()
+ .little_endian()
+ .new_write(mock! {
+ data(b"\x09\x00\x00\x00"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_with_short_length_field() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_write(mock! {
+ data(b"\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_max_frame_len() {
+ let io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_write(mock! {});
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_update_max_frame_len_at_rest() {
+ let io = length_delimited::Builder::new().new_write(mock! {
+ data(b"\x00\x00\x00\x06"),
+ data(b"abcdef"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ io.encoder_mut().set_max_frame_length(5);
+
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_update_max_frame_len_in_flight() {
+ let io = length_delimited::Builder::new().new_write(mock! {
+ data(b"\x00\x00\x00\x06"),
+ data(b"ab"),
+ Pending,
+ data(b"cdef"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_pending!(io.as_mut().poll_flush(cx));
+
+ io.encoder_mut().set_max_frame_length(5);
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_zero() {
+ let io = length_delimited::Builder::new().new_write(mock! {});
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_ready_err!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn encode_overflow() {
+ // Test reproducing tokio-rs/tokio#681.
+ let mut codec = length_delimited::Builder::new().new_codec();
+ let mut buf = BytesMut::with_capacity(1024);
+
+ // Put some data into the buffer without resizing it to hold more.
+ let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
+ buf.put_slice(&some_as[..]);
+
+ // Trying to encode the length header should resize the buffer if it won't fit.
+ codec.encode(Bytes::from("hello"), &mut buf).unwrap();
+}
+
+// ===== Test utils =====
+
+struct Mock {
+ calls: VecDeque<Poll<io::Result<Op>>>,
+}
+
+enum Op {
+ Data(Vec<u8>),
+ Flush,
+}
+
+use self::Op::*;
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ dst: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ debug_assert!(dst.len() >= data.len());
+ dst[..data.len()].copy_from_slice(&data[..]);
+ Ready(Ok(data.len()))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(0)),
+ }
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ let len = data.len();
+ assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
+ assert_eq!(&data[..], &src[..len]);
+ Ready(Ok(len))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(0)),
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(())),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Ready(Ok(()))
+ }
+}
+
+impl<'a> From<&'a [u8]> for Op {
+ fn from(src: &'a [u8]) -> Op {
+ Op::Data(src.into())
+ }
+}
+
+impl From<Vec<u8>> for Op {
+ fn from(src: Vec<u8>) -> Op {
+ Op::Data(src)
+ }
+}
+
+fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
+ Ready(Ok(bytes.into()))
+}
+
+fn flush() -> Poll<io::Result<Op>> {
+ Ready(Ok(Flush))
+}
diff --git a/third_party/rust/tokio-util/tests/udp.rs b/third_party/rust/tokio-util/tests/udp.rs
new file mode 100644
index 0000000000..af8002bd80
--- /dev/null
+++ b/third_party/rust/tokio-util/tests/udp.rs
@@ -0,0 +1,79 @@
+use tokio::net::UdpSocket;
+use tokio_util::codec::{Decoder, Encoder};
+use tokio_util::udp::UdpFramed;
+
+use bytes::{BufMut, BytesMut};
+use futures::future::try_join;
+use futures::future::FutureExt;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::io;
+
+#[tokio::test]
+async fn send_framed() -> std::io::Result<()> {
+ let mut a_soc = UdpSocket::bind("127.0.0.1:0").await?;
+ let mut b_soc = UdpSocket::bind("127.0.0.1:0").await?;
+
+ let a_addr = a_soc.local_addr()?;
+ let b_addr = b_soc.local_addr()?;
+
+ // test sending & receiving bytes
+ {
+ let mut a = UdpFramed::new(a_soc, ByteCodec);
+ let mut b = UdpFramed::new(b_soc, ByteCodec);
+
+ let msg = b"4567".to_vec();
+
+ let send = a.send((msg.clone(), b_addr));
+ let recv = b.next().map(|e| e.unwrap());
+ let (_, received) = try_join(send, recv).await.unwrap();
+
+ let (data, addr) = received;
+ assert_eq!(msg, data);
+ assert_eq!(a_addr, addr);
+
+ a_soc = a.into_inner();
+ b_soc = b.into_inner();
+ }
+
+ // test sending & receiving an empty message
+ {
+ let mut a = UdpFramed::new(a_soc, ByteCodec);
+ let mut b = UdpFramed::new(b_soc, ByteCodec);
+
+ let msg = b"".to_vec();
+
+ let send = a.send((msg.clone(), b_addr));
+ let recv = b.next().map(|e| e.unwrap());
+ let (_, received) = try_join(send, recv).await.unwrap();
+
+ let (data, addr) = received;
+ assert_eq!(msg, data);
+ assert_eq!(a_addr, addr);
+ }
+
+ Ok(())
+}
+
+pub struct ByteCodec;
+
+impl Decoder for ByteCodec {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> {
+ let len = buf.len();
+ Ok(Some(buf.split_to(len).to_vec()))
+ }
+}
+
+impl Encoder for ByteCodec {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put_slice(&data);
+ Ok(())
+ }
+}