summaryrefslogtreecommitdiffstats
path: root/vendor/tokio-util/tests/length_delimited.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:59:35 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:59:35 +0000
commitd1b2d29528b7794b41e66fc2136e395a02f8529b (patch)
treea4a17504b260206dec3cf55b2dca82929a348ac2 /vendor/tokio-util/tests/length_delimited.rs
parentReleasing progress-linux version 1.72.1+dfsg1-1~progress7.99u1. (diff)
downloadrustc-d1b2d29528b7794b41e66fc2136e395a02f8529b.tar.xz
rustc-d1b2d29528b7794b41e66fc2136e395a02f8529b.zip
Merging upstream version 1.73.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio-util/tests/length_delimited.rs')
-rw-r--r--vendor/tokio-util/tests/length_delimited.rs779
1 files changed, 779 insertions, 0 deletions
diff --git a/vendor/tokio-util/tests/length_delimited.rs b/vendor/tokio-util/tests/length_delimited.rs
new file mode 100644
index 000000000..126e41b5c
--- /dev/null
+++ b/vendor/tokio-util/tests/length_delimited.rs
@@ -0,0 +1,779 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio_test::task;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+use tokio_util::codec::*;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::{pin_mut, Sink, Stream};
+use std::collections::VecDeque;
+use std::io;
+use std::pin::Pin;
+use std::task::Poll::*;
+use std::task::{Context, Poll};
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! assert_next_eq {
+ ($io:ident, $expect:expr) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => panic!("none"),
+ }
+ });
+ }};
+}
+
+macro_rules! assert_next_pending {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(e))) => panic!("error = {:?}", e),
+ Ready(None) => panic!("done"),
+ Pending => {}
+ });
+ }};
+}
+
+macro_rules! assert_next_err {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(_))) => {}
+ Ready(None) => panic!("done"),
+ Pending => panic!("pending"),
+ });
+ }};
+}
+
+macro_rules! assert_done {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => panic!("value = {:?}", v),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => {}
+ }
+ });
+ }};
+}
+
+#[test]
+fn read_empty_io_yields_nothing() {
+ let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
+ pin_mut!(io);
+
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet_little_endian() {
+ let io = length_delimited::Builder::new()
+ .little_endian()
+ .new_read(mock! {
+ data(b"\x09\x00\x00\x00abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_one_packet_native_endian() {
+ let d = if cfg!(target_endian = "big") {
+ b"\x00\x00\x00\x09abcdefghi"
+ } else {
+ b"\x09\x00\x00\x00abcdefghi"
+ };
+ let io = length_delimited::Builder::new()
+ .native_endian()
+ .new_read(mock! {
+ data(d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
+ d.extend_from_slice(b"\x00\x00\x00\x03123");
+ d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
+
+ let io = FramedRead::new(
+ mock! {
+ data(&d),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_multi_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ data(b"\x00\x09abc"),
+ data(b"defghi"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_multi_frame_multi_packet() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ data(b"\x00\x09abc"),
+ data(b"defghi"),
+ data(b"\x00\x00\x00\x0312"),
+ data(b"3\x00\x00\x00\x0bhello world"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_multi_packet_wait() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09abc"),
+ Pending,
+ data(b"defghi"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_pending!(io);
+ assert_done!(io);
+}
+
+#[test]
+fn read_multi_frame_multi_packet_wait() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09abc"),
+ Pending,
+ data(b"defghi"),
+ Pending,
+ data(b"\x00\x00\x00\x0312"),
+ Pending,
+ data(b"3\x00\x00\x00\x0bhello world"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_next_pending!(io);
+ assert_done!(io);
+}
+
+#[test]
+fn read_incomplete_head() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00"),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_incomplete_head_multi() {
+ let io = FramedRead::new(
+ mock! {
+ Pending,
+ data(b"\x00"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_incomplete_payload() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00\x00\x09ab"),
+ Pending,
+ data(b"cd"),
+ Pending,
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ assert_next_pending!(io);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_max_frame_len() {
+ let io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_read(mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_update_max_frame_len_at_rest() {
+ let io = length_delimited::Builder::new().new_read(mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ io.decoder_mut().set_max_frame_length(5);
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_update_max_frame_len_in_flight() {
+ let io = length_delimited::Builder::new().new_read(mock! {
+ data(b"\x00\x00\x00\x09abcd"),
+ Pending,
+ data(b"efghi"),
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_pending!(io);
+ io.decoder_mut().set_max_frame_length(5);
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_err!(io);
+}
+
+#[test]
+fn read_one_byte_length_field() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_read(mock! {
+ data(b"\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_header_offset() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(4)
+ .new_read(mock! {
+ data(b"zzzz\x00\x09abcdefghi"),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_skip_none_adjusted() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"xx\x00\x09abcdefghi");
+ d.extend_from_slice(b"yy\x00\x03123");
+ d.extend_from_slice(b"zz\x00\x0bhello world");
+
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_field_offset(2)
+ .num_skip(0)
+ .length_adjustment(4)
+ .new_read(mock! {
+ data(&d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"xx\x00\x09abcdefghi");
+ assert_next_eq!(io, b"yy\x00\x03123");
+ assert_next_eq!(io, b"zz\x00\x0bhello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_frame_length_adjusted() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
+
+ let io = length_delimited::Builder::new()
+ .length_field_offset(0)
+ .length_field_length(3)
+ .length_adjustment(0)
+ .num_skip(4)
+ .new_read(mock! {
+ data(&d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"Hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn read_single_multi_frame_one_packet_length_includes_head() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x0babcdefghi");
+ d.extend_from_slice(b"\x00\x05123");
+ d.extend_from_slice(b"\x00\x0dhello world");
+
+ let io = length_delimited::Builder::new()
+ .length_field_length(2)
+ .length_adjustment(-2)
+ .new_read(mock! {
+ data(&d),
+ });
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+fn write_single_frame_length_adjusted() {
+ let io = length_delimited::Builder::new()
+ .length_adjustment(-2)
+ .new_write(mock! {
+ data(b"\x00\x00\x00\x0b"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_nothing_yields_nothing() {
+ let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.poll_flush(cx));
+ });
+}
+
+#[test]
+fn write_single_frame_one_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_multi_frame_one_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ data(b"\x00\x00\x00\x03"),
+ data(b"123"),
+ data(b"\x00\x00\x00\x0b"),
+ data(b"hello world"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("123")));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_multi_frame_multi_packet() {
+ let io = FramedWrite::new(
+ mock! {
+ data(b"\x00\x00\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ data(b"\x00\x00\x00\x03"),
+ data(b"123"),
+ flush(),
+ data(b"\x00\x00\x00\x0b"),
+ data(b"hello world"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("123")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_would_block() {
+ let io = FramedWrite::new(
+ mock! {
+ Pending,
+ data(b"\x00\x00"),
+ Pending,
+ data(b"\x00\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ },
+ LengthDelimitedCodec::new(),
+ );
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_pending!(io.as_mut().poll_flush(cx));
+ assert_pending!(io.as_mut().poll_flush(cx));
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_little_endian() {
+ let io = length_delimited::Builder::new()
+ .little_endian()
+ .new_write(mock! {
+ data(b"\x09\x00\x00\x00"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_single_frame_with_short_length_field() {
+ let io = length_delimited::Builder::new()
+ .length_field_length(1)
+ .new_write(mock! {
+ data(b"\x09"),
+ data(b"abcdefghi"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_max_frame_len() {
+ let io = length_delimited::Builder::new()
+ .max_frame_length(5)
+ .new_write(mock! {});
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_update_max_frame_len_at_rest() {
+ let io = length_delimited::Builder::new().new_write(mock! {
+ data(b"\x00\x00\x00\x06"),
+ data(b"abcdef"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ io.encoder_mut().set_max_frame_length(5);
+
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_update_max_frame_len_in_flight() {
+ let io = length_delimited::Builder::new().new_write(mock! {
+ data(b"\x00\x00\x00\x06"),
+ data(b"ab"),
+ Pending,
+ data(b"cdef"),
+ flush(),
+ });
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_pending!(io.as_mut().poll_flush(cx));
+
+ io.encoder_mut().set_max_frame_length(5);
+
+ assert_ready_ok!(io.as_mut().poll_flush(cx));
+
+ assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn write_zero() {
+ let io = length_delimited::Builder::new().new_write(mock! {});
+ pin_mut!(io);
+
+ task::spawn(()).enter(|cx, _| {
+ assert_ready_ok!(io.as_mut().poll_ready(cx));
+ assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
+
+ assert_ready_err!(io.as_mut().poll_flush(cx));
+
+ assert!(io.get_ref().calls.is_empty());
+ });
+}
+
+#[test]
+fn encode_overflow() {
+ // Test reproducing tokio-rs/tokio#681.
+ let mut codec = length_delimited::Builder::new().new_codec();
+ let mut buf = BytesMut::with_capacity(1024);
+
+ // Put some data into the buffer without resizing it to hold more.
+ let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
+ buf.put_slice(&some_as[..]);
+
+ // Trying to encode the length header should resize the buffer if it won't fit.
+ codec.encode(Bytes::from("hello"), &mut buf).unwrap();
+}
+
+// ===== Test utils =====
+
+struct Mock {
+ calls: VecDeque<Poll<io::Result<Op>>>,
+}
+
+enum Op {
+ Data(Vec<u8>),
+ Flush,
+}
+
+use self::Op::*;
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ dst: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ debug_assert!(dst.remaining() >= data.len());
+ dst.put_slice(&data);
+ Ready(Ok(()))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(())),
+ }
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ let len = data.len();
+ assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
+ assert_eq!(&data[..], &src[..len]);
+ Ready(Ok(len))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(0)),
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(())),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Ready(Ok(()))
+ }
+}
+
+impl<'a> From<&'a [u8]> for Op {
+ fn from(src: &'a [u8]) -> Op {
+ Op::Data(src.into())
+ }
+}
+
+impl From<Vec<u8>> for Op {
+ fn from(src: Vec<u8>) -> Op {
+ Op::Data(src)
+ }
+}
+
+fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
+ Ready(Ok(bytes.into()))
+}
+
+fn flush() -> Poll<io::Result<Op>> {
+ Ready(Ok(Flush))
+}