779 lines
19 KiB
Rust
779 lines
19 KiB
Rust
#![warn(rust_2018_idioms)]
|
|
|
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
|
use tokio_test::task;
|
|
use tokio_test::{
|
|
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
|
|
};
|
|
use tokio_util::codec::*;
|
|
|
|
use bytes::{BufMut, Bytes, BytesMut};
|
|
use futures::{pin_mut, Sink, Stream};
|
|
use std::collections::VecDeque;
|
|
use std::io;
|
|
use std::pin::Pin;
|
|
use std::task::Poll::*;
|
|
use std::task::{Context, Poll};
|
|
|
|
macro_rules! mock {
|
|
($($x:expr,)*) => {{
|
|
let mut v = VecDeque::new();
|
|
v.extend(vec![$($x),*]);
|
|
Mock { calls: v }
|
|
}};
|
|
}
|
|
|
|
macro_rules! assert_next_eq {
|
|
($io:ident, $expect:expr) => {{
|
|
task::spawn(()).enter(|cx, _| {
|
|
let res = assert_ready!($io.as_mut().poll_next(cx));
|
|
match res {
|
|
Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
|
|
Some(Err(e)) => panic!("error = {:?}", e),
|
|
None => panic!("none"),
|
|
}
|
|
});
|
|
}};
|
|
}
|
|
|
|
macro_rules! assert_next_pending {
|
|
($io:ident) => {{
|
|
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
|
|
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
|
|
Ready(Some(Err(e))) => panic!("error = {:?}", e),
|
|
Ready(None) => panic!("done"),
|
|
Pending => {}
|
|
});
|
|
}};
|
|
}
|
|
|
|
macro_rules! assert_next_err {
|
|
($io:ident) => {{
|
|
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
|
|
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
|
|
Ready(Some(Err(_))) => {}
|
|
Ready(None) => panic!("done"),
|
|
Pending => panic!("pending"),
|
|
});
|
|
}};
|
|
}
|
|
|
|
macro_rules! assert_done {
|
|
($io:ident) => {{
|
|
task::spawn(()).enter(|cx, _| {
|
|
let res = assert_ready!($io.as_mut().poll_next(cx));
|
|
match res {
|
|
Some(Ok(v)) => panic!("value = {:?}", v),
|
|
Some(Err(e)) => panic!("error = {:?}", e),
|
|
None => {}
|
|
}
|
|
});
|
|
}};
|
|
}
|
|
|
|
#[test]
|
|
fn read_empty_io_yields_nothing() {
|
|
let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
|
|
pin_mut!(io);
|
|
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_one_packet() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00\x00\x09abcdefghi"),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_one_packet_little_endian() {
|
|
let io = length_delimited::Builder::new()
|
|
.little_endian()
|
|
.new_read(mock! {
|
|
data(b"\x09\x00\x00\x00abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_one_packet_native_endian() {
|
|
let d = if cfg!(target_endian = "big") {
|
|
b"\x00\x00\x00\x09abcdefghi"
|
|
} else {
|
|
b"\x09\x00\x00\x00abcdefghi"
|
|
};
|
|
let io = length_delimited::Builder::new()
|
|
.native_endian()
|
|
.new_read(mock! {
|
|
data(d),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_multi_frame_one_packet() {
|
|
let mut d: Vec<u8> = vec![];
|
|
d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
|
|
d.extend_from_slice(b"\x00\x00\x00\x03123");
|
|
d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
|
|
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(&d),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_eq!(io, b"123");
|
|
assert_next_eq!(io, b"hello world");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_multi_packet() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00"),
|
|
data(b"\x00\x09abc"),
|
|
data(b"defghi"),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_multi_frame_multi_packet() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00"),
|
|
data(b"\x00\x09abc"),
|
|
data(b"defghi"),
|
|
data(b"\x00\x00\x00\x0312"),
|
|
data(b"3\x00\x00\x00\x0bhello world"),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_eq!(io, b"123");
|
|
assert_next_eq!(io, b"hello world");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_multi_packet_wait() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00"),
|
|
Pending,
|
|
data(b"\x00\x09abc"),
|
|
Pending,
|
|
data(b"defghi"),
|
|
Pending,
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_pending!(io);
|
|
assert_next_pending!(io);
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_pending!(io);
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_multi_frame_multi_packet_wait() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00"),
|
|
Pending,
|
|
data(b"\x00\x09abc"),
|
|
Pending,
|
|
data(b"defghi"),
|
|
Pending,
|
|
data(b"\x00\x00\x00\x0312"),
|
|
Pending,
|
|
data(b"3\x00\x00\x00\x0bhello world"),
|
|
Pending,
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_pending!(io);
|
|
assert_next_pending!(io);
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_pending!(io);
|
|
assert_next_pending!(io);
|
|
assert_next_eq!(io, b"123");
|
|
assert_next_eq!(io, b"hello world");
|
|
assert_next_pending!(io);
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_incomplete_head() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00"),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_incomplete_head_multi() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
Pending,
|
|
data(b"\x00"),
|
|
Pending,
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_pending!(io);
|
|
assert_next_pending!(io);
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_incomplete_payload() {
|
|
let io = FramedRead::new(
|
|
mock! {
|
|
data(b"\x00\x00\x00\x09ab"),
|
|
Pending,
|
|
data(b"cd"),
|
|
Pending,
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
assert_next_pending!(io);
|
|
assert_next_pending!(io);
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_max_frame_len() {
|
|
let io = length_delimited::Builder::new()
|
|
.max_frame_length(5)
|
|
.new_read(mock! {
|
|
data(b"\x00\x00\x00\x09abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_update_max_frame_len_at_rest() {
|
|
let io = length_delimited::Builder::new().new_read(mock! {
|
|
data(b"\x00\x00\x00\x09abcdefghi"),
|
|
data(b"\x00\x00\x00\x09abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
io.decoder_mut().set_max_frame_length(5);
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_update_max_frame_len_in_flight() {
|
|
let io = length_delimited::Builder::new().new_read(mock! {
|
|
data(b"\x00\x00\x00\x09abcd"),
|
|
Pending,
|
|
data(b"efghi"),
|
|
data(b"\x00\x00\x00\x09abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_pending!(io);
|
|
io.decoder_mut().set_max_frame_length(5);
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_err!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_one_byte_length_field() {
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_length(1)
|
|
.new_read(mock! {
|
|
data(b"\x09abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_header_offset() {
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_length(2)
|
|
.length_field_offset(4)
|
|
.new_read(mock! {
|
|
data(b"zzzz\x00\x09abcdefghi"),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_multi_frame_one_packet_skip_none_adjusted() {
|
|
let mut d: Vec<u8> = vec![];
|
|
d.extend_from_slice(b"xx\x00\x09abcdefghi");
|
|
d.extend_from_slice(b"yy\x00\x03123");
|
|
d.extend_from_slice(b"zz\x00\x0bhello world");
|
|
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_length(2)
|
|
.length_field_offset(2)
|
|
.num_skip(0)
|
|
.length_adjustment(4)
|
|
.new_read(mock! {
|
|
data(&d),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"xx\x00\x09abcdefghi");
|
|
assert_next_eq!(io, b"yy\x00\x03123");
|
|
assert_next_eq!(io, b"zz\x00\x0bhello world");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_frame_length_adjusted() {
|
|
let mut d: Vec<u8> = vec![];
|
|
d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
|
|
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_offset(0)
|
|
.length_field_length(3)
|
|
.length_adjustment(0)
|
|
.num_skip(4)
|
|
.new_read(mock! {
|
|
data(&d),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"Hello world");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn read_single_multi_frame_one_packet_length_includes_head() {
|
|
let mut d: Vec<u8> = vec![];
|
|
d.extend_from_slice(b"\x00\x0babcdefghi");
|
|
d.extend_from_slice(b"\x00\x05123");
|
|
d.extend_from_slice(b"\x00\x0dhello world");
|
|
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_length(2)
|
|
.length_adjustment(-2)
|
|
.new_read(mock! {
|
|
data(&d),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
assert_next_eq!(io, b"abcdefghi");
|
|
assert_next_eq!(io, b"123");
|
|
assert_next_eq!(io, b"hello world");
|
|
assert_done!(io);
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_frame_length_adjusted() {
|
|
let io = length_delimited::Builder::new()
|
|
.length_adjustment(-2)
|
|
.new_write(mock! {
|
|
data(b"\x00\x00\x00\x0b"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_nothing_yields_nothing() {
|
|
let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.poll_flush(cx));
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_frame_one_packet() {
|
|
let io = FramedWrite::new(
|
|
mock! {
|
|
data(b"\x00\x00\x00\x09"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_multi_frame_one_packet() {
|
|
let io = FramedWrite::new(
|
|
mock! {
|
|
data(b"\x00\x00\x00\x09"),
|
|
data(b"abcdefghi"),
|
|
data(b"\x00\x00\x00\x03"),
|
|
data(b"123"),
|
|
data(b"\x00\x00\x00\x0b"),
|
|
data(b"hello world"),
|
|
flush(),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("123")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_multi_frame_multi_packet() {
|
|
let io = FramedWrite::new(
|
|
mock! {
|
|
data(b"\x00\x00\x00\x09"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
data(b"\x00\x00\x00\x03"),
|
|
data(b"123"),
|
|
flush(),
|
|
data(b"\x00\x00\x00\x0b"),
|
|
data(b"hello world"),
|
|
flush(),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("123")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_frame_would_block() {
|
|
let io = FramedWrite::new(
|
|
mock! {
|
|
Pending,
|
|
data(b"\x00\x00"),
|
|
Pending,
|
|
data(b"\x00\x09"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
},
|
|
LengthDelimitedCodec::new(),
|
|
);
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
|
|
assert_pending!(io.as_mut().poll_flush(cx));
|
|
assert_pending!(io.as_mut().poll_flush(cx));
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_frame_little_endian() {
|
|
let io = length_delimited::Builder::new()
|
|
.little_endian()
|
|
.new_write(mock! {
|
|
data(b"\x09\x00\x00\x00"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_single_frame_with_short_length_field() {
|
|
let io = length_delimited::Builder::new()
|
|
.length_field_length(1)
|
|
.new_write(mock! {
|
|
data(b"\x09"),
|
|
data(b"abcdefghi"),
|
|
flush(),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_max_frame_len() {
|
|
let io = length_delimited::Builder::new()
|
|
.max_frame_length(5)
|
|
.new_write(mock! {});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_update_max_frame_len_at_rest() {
|
|
let io = length_delimited::Builder::new().new_write(mock! {
|
|
data(b"\x00\x00\x00\x06"),
|
|
data(b"abcdef"),
|
|
flush(),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
io.encoder_mut().set_max_frame_length(5);
|
|
|
|
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_update_max_frame_len_in_flight() {
|
|
let io = length_delimited::Builder::new().new_write(mock! {
|
|
data(b"\x00\x00\x00\x06"),
|
|
data(b"ab"),
|
|
Pending,
|
|
data(b"cdef"),
|
|
flush(),
|
|
});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
|
|
assert_pending!(io.as_mut().poll_flush(cx));
|
|
|
|
io.encoder_mut().set_max_frame_length(5);
|
|
|
|
assert_ready_ok!(io.as_mut().poll_flush(cx));
|
|
|
|
assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn write_zero() {
|
|
let io = length_delimited::Builder::new().new_write(mock! {});
|
|
pin_mut!(io);
|
|
|
|
task::spawn(()).enter(|cx, _| {
|
|
assert_ready_ok!(io.as_mut().poll_ready(cx));
|
|
assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
|
|
|
|
assert_ready_err!(io.as_mut().poll_flush(cx));
|
|
|
|
assert!(io.get_ref().calls.is_empty());
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn encode_overflow() {
|
|
// Test reproducing tokio-rs/tokio#681.
|
|
let mut codec = length_delimited::Builder::new().new_codec();
|
|
let mut buf = BytesMut::with_capacity(1024);
|
|
|
|
// Put some data into the buffer without resizing it to hold more.
|
|
let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
|
|
buf.put_slice(&some_as[..]);
|
|
|
|
// Trying to encode the length header should resize the buffer if it won't fit.
|
|
codec.encode(Bytes::from("hello"), &mut buf).unwrap();
|
|
}
|
|
|
|
// ===== Test utils =====
|
|
|
|
struct Mock {
|
|
calls: VecDeque<Poll<io::Result<Op>>>,
|
|
}
|
|
|
|
enum Op {
|
|
Data(Vec<u8>),
|
|
Flush,
|
|
}
|
|
|
|
use self::Op::*;
|
|
|
|
impl AsyncRead for Mock {
|
|
fn poll_read(
|
|
mut self: Pin<&mut Self>,
|
|
_cx: &mut Context<'_>,
|
|
dst: &mut ReadBuf<'_>,
|
|
) -> Poll<io::Result<()>> {
|
|
match self.calls.pop_front() {
|
|
Some(Ready(Ok(Op::Data(data)))) => {
|
|
debug_assert!(dst.remaining() >= data.len());
|
|
dst.put_slice(&data);
|
|
Ready(Ok(()))
|
|
}
|
|
Some(Ready(Ok(_))) => panic!(),
|
|
Some(Ready(Err(e))) => Ready(Err(e)),
|
|
Some(Pending) => Pending,
|
|
None => Ready(Ok(())),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AsyncWrite for Mock {
|
|
fn poll_write(
|
|
mut self: Pin<&mut Self>,
|
|
_cx: &mut Context<'_>,
|
|
src: &[u8],
|
|
) -> Poll<Result<usize, io::Error>> {
|
|
match self.calls.pop_front() {
|
|
Some(Ready(Ok(Op::Data(data)))) => {
|
|
let len = data.len();
|
|
assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
|
|
assert_eq!(&data[..], &src[..len]);
|
|
Ready(Ok(len))
|
|
}
|
|
Some(Ready(Ok(_))) => panic!(),
|
|
Some(Ready(Err(e))) => Ready(Err(e)),
|
|
Some(Pending) => Pending,
|
|
None => Ready(Ok(0)),
|
|
}
|
|
}
|
|
|
|
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
|
match self.calls.pop_front() {
|
|
Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
|
|
Some(Ready(Ok(_))) => panic!(),
|
|
Some(Ready(Err(e))) => Ready(Err(e)),
|
|
Some(Pending) => Pending,
|
|
None => Ready(Ok(())),
|
|
}
|
|
}
|
|
|
|
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
|
Ready(Ok(()))
|
|
}
|
|
}
|
|
|
|
impl<'a> From<&'a [u8]> for Op {
|
|
fn from(src: &'a [u8]) -> Op {
|
|
Op::Data(src.into())
|
|
}
|
|
}
|
|
|
|
impl From<Vec<u8>> for Op {
|
|
fn from(src: Vec<u8>) -> Op {
|
|
Op::Data(src)
|
|
}
|
|
}
|
|
|
|
fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
|
|
Ready(Ok(bytes.into()))
|
|
}
|
|
|
|
fn flush() -> Poll<io::Result<Op>> {
|
|
Ready(Ok(Flush))
|
|
}
|