1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;
use tokio_io::AsyncWrite;
use tokio_codec::{Encoder, FramedWrite};
use futures::{Sink, Poll};
use bytes::{BytesMut, BufMut, BigEndian};
use std::io::{self, Write};
use std::collections::VecDeque;
macro_rules! mock {
($($x:expr,)*) => {{
let mut v = VecDeque::new();
v.extend(vec![$($x),*]);
Mock { calls: v }
}};
}
struct U32Encoder;
impl Encoder for U32Encoder {
type Item = u32;
type Error = io::Error;
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
// Reserve space
dst.reserve(4);
dst.put_u32_be(item);
Ok(())
}
}
#[test]
fn write_multi_frame_in_packet() {
let mock = mock! {
Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
};
let mut framed = FramedWrite::new(mock, U32Encoder);
assert!(framed.start_send(0).unwrap().is_ready());
assert!(framed.start_send(1).unwrap().is_ready());
assert!(framed.start_send(2).unwrap().is_ready());
// Nothing written yet
assert_eq!(1, framed.get_ref().calls.len());
// Flush the writes
assert!(framed.poll_complete().unwrap().is_ready());
assert_eq!(0, framed.get_ref().calls.len());
}
#[test]
fn write_hits_backpressure() {
const ITER: usize = 2 * 1024;
let mut mock = mock! {
// Block the `ITER`th write
Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
Ok(b"".to_vec()),
};
for i in 0..(ITER + 1) {
let mut b = BytesMut::with_capacity(4);
b.put_u32_be(i as u32);
// Append to the end
match mock.calls.back_mut().unwrap() {
&mut Ok(ref mut data) => {
// Write in 2kb chunks
if data.len() < ITER {
data.extend_from_slice(&b[..]);
continue;
}
}
_ => unreachable!(),
}
// Push a new new chunk
mock.calls.push_back(Ok(b[..].to_vec()));
}
let mut framed = FramedWrite::new(mock, U32Encoder);
for i in 0..ITER {
assert!(framed.start_send(i as u32).unwrap().is_ready());
}
// This should reject
assert!(!framed.start_send(ITER as u32).unwrap().is_ready());
// This should succeed and start flushing the buffer.
assert!(framed.start_send(ITER as u32).unwrap().is_ready());
// Flush the rest of the buffer
assert!(framed.poll_complete().unwrap().is_ready());
// Ensure the mock is empty
assert_eq!(0, framed.get_ref().calls.len());
}
// ===== Mock ======
struct Mock {
calls: VecDeque<io::Result<Vec<u8>>>,
}
impl Write for Mock {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
match self.calls.pop_front() {
Some(Ok(data)) => {
assert!(src.len() >= data.len());
assert_eq!(&data[..], &src[..data.len()]);
Ok(data.len())
}
Some(Err(e)) => Err(e),
None => panic!("unexpected write; {:?}", src),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsyncWrite for Mock {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
|