1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
|
use std::io;
use futures_util::future;
use tokio::net::TcpStream;
use super::Client;
#[tokio::test]
async fn client_connect_uri_argument() {
let connector = tower_util::service_fn(|dst: http::Uri| {
assert_eq!(dst.scheme(), Some(&http::uri::Scheme::HTTP));
assert_eq!(dst.host(), Some("example.local"));
assert_eq!(dst.port(), None);
assert_eq!(dst.path(), "/", "path should be removed");
future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
});
let client = Client::builder().build::<_, crate::Body>(connector);
let _ = client
.get("http://example.local/and/a/path".parse().unwrap())
.await
.expect_err("response should fail");
}
/*
// FIXME: re-implement tests with `async/await`
#[test]
fn retryable_request() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock("http://mock.local");
let client = Client::builder()
.build::<_, crate::Body>(connector);
client.pool.no_timer();
{
let req = Request::builder()
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1");
}
drop(sock1);
let req = Request::builder()
.uri("http://mock.local/b")
.body(Default::default())
.unwrap();
let res2 = client.request(req)
.map(|res| {
assert_eq!(res.status().as_u16(), 222);
});
let srv2 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
rt.block_on(res2.join(srv2)).expect("res2");
}
#[test]
fn conn_reset_after_write() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local");
let client = Client::builder()
.build::<_, crate::Body>(connector);
client.pool.no_timer();
{
let req = Request::builder()
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1");
}
let req = Request::builder()
.uri("http://mock.local/a")
.body(Default::default())
.unwrap();
let res2 = client.request(req);
let mut sock1 = Some(sock1);
let srv2 = poll_fn(|| {
// We purposefully keep the socket open until the client
// has written the second request, and THEN disconnect.
//
// Not because we expect servers to be jerks, but to trigger
// state where we write on an assumedly good connection, and
// only reset the close AFTER we wrote bytes.
try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
sock1.take();
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
let err = rt.block_on(res2.join(srv2)).expect_err("res2");
assert!(err.is_incomplete_message(), "{:?}", err);
}
#[test]
fn checkout_win_allows_connect_future_to_be_pooled() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let (tx, rx) = oneshot::channel::<()>();
let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock_fut("http://mock.local", rx);
let client = Client::builder()
.build::<_, crate::Body>(connector);
client.pool.no_timer();
let uri = "http://mock.local/a".parse::<crate::Uri>().expect("uri parse");
// First request just sets us up to have a connection able to be put
// back in the pool. *However*, it doesn't insert immediately. The
// body has 1 pending byte, and we will only drain in request 2, once
// the connect future has been started.
let mut body = {
let res1 = client.get(uri.clone())
.map(|res| res.into_body().concat2());
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
// Chunked is used so as to force 2 body reads.
try_ready!(sock1.write(b"\
HTTP/1.1 200 OK\r\n\
transfer-encoding: chunked\r\n\
\r\n\
1\r\nx\r\n\
0\r\n\r\n\
"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1").0
};
// The second request triggers the only mocked connect future, but then
// the drained body allows the first socket to go back to the pool,
// "winning" the checkout race.
{
let res2 = client.get(uri.clone());
let drain = poll_fn(move || {
body.poll()
});
let srv2 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv2 poll_fn error: {}", e));
rt.block_on(res2.join(drain).join(srv2)).expect("res2");
}
// "Release" the mocked connect future, and let the runtime spin once so
// it's all setup...
{
let mut tx = Some(tx);
let client = &client;
let key = client.pool.h1_key("http://mock.local");
let mut tick_cnt = 0;
let fut = poll_fn(move || {
tx.take();
if client.pool.idle_count(&key) == 0 {
tick_cnt += 1;
assert!(tick_cnt < 10, "ticked too many times waiting for idle");
trace!("no idle yet; tick count: {}", tick_cnt);
::futures::task::current().notify();
Ok(Async::NotReady)
} else {
Ok::<_, ()>(Async::Ready(()))
}
});
rt.block_on(fut).unwrap();
}
// Third request just tests out that the "loser" connection was pooled. If
// it isn't, this will panic since the MockConnector doesn't have any more
// mocks to give out.
{
let res3 = client.get(uri);
let srv3 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv3 poll_fn error: {}", e));
rt.block_on(res3.join(srv3)).expect("res3");
}
}
#[cfg(feature = "nightly")]
#[bench]
fn bench_http1_get_0b(b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let client = Client::builder()
.build::<_, crate::Body>(connector.clone());
client.pool.no_timer();
let uri = Uri::from_static("http://mock.local/a");
b.iter(move || {
let sock1 = connector.mock("http://mock.local");
let res1 = client
.get(uri.clone())
.and_then(|res| {
res.into_body().for_each(|_| Ok(()))
});
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1");
});
}
#[cfg(feature = "nightly")]
#[bench]
fn bench_http1_get_10b(b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let client = Client::builder()
.build::<_, crate::Body>(connector.clone());
client.pool.no_timer();
let uri = Uri::from_static("http://mock.local/a");
b.iter(move || {
let sock1 = connector.mock("http://mock.local");
let res1 = client
.get(uri.clone())
.and_then(|res| {
res.into_body().for_each(|_| Ok(()))
});
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\n0123456789"));
Ok(Async::Ready(()))
}).map_err(|e: std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1");
});
}
*/
|