1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
|
#![allow(deprecated)]
extern crate mio;
extern crate bytes;
extern crate net2;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate iovec;
extern crate slab;
extern crate tempdir;
#[cfg(target_os = "fuchsia")]
extern crate fuchsia_zircon as zircon;
pub use ports::localhost;
mod test_custom_evented;
mod test_close_on_drop;
mod test_double_register;
mod test_echo_server;
mod test_local_addr_ready;
mod test_multicast;
mod test_oneshot;
mod test_poll;
mod test_register_deregister;
mod test_register_multiple_event_loops;
mod test_reregister_without_poll;
mod test_smoke;
mod test_tcp;
mod test_tcp_level;
mod test_tcp_shutdown;
mod test_udp_level;
mod test_udp_socket;
mod test_write_then_drop;
#[cfg(feature = "with-deprecated")]
mod test_notify;
#[cfg(feature = "with-deprecated")]
mod test_poll_channel;
#[cfg(feature = "with-deprecated")]
mod test_tick;
// The following tests are for deprecated features. Only run these tests on
// platforms that were supported from before the features were deprecated
#[cfg(any(target_os = "macos", target_os = "linux", target_os = "windows"))]
#[cfg(feature = "with-deprecated")]
mod test_battery;
#[cfg(any(target_os = "macos", target_os = "linux"))]
#[cfg(feature = "with-deprecated")]
mod test_unix_echo_server;
#[cfg(any(target_os = "macos", target_os = "linux"))]
#[cfg(feature = "with-deprecated")]
mod test_unix_pass_fd;
#[cfg(any(target_os = "macos", target_os = "linux"))]
#[cfg(feature = "with-deprecated")]
mod test_uds_shutdown;
#[cfg(any(target_os = "macos", target_os = "linux"))]
#[cfg(feature = "with-deprecated")]
mod test_subprocess_pipe;
#[cfg(any(target_os = "macos", target_os = "linux"))]
#[cfg(feature = "with-deprecated")]
mod test_broken_pipe;
#[cfg(any(target_os = "fuchsia"))]
mod test_fuchsia_handles;
use bytes::{Buf, MutBuf};
use std::io::{self, Read, Write};
use std::time::Duration;
use mio::{Events, Poll};
use mio::event::Event;
pub trait TryRead {
fn try_read_buf<B: MutBuf>(&mut self, buf: &mut B) -> io::Result<Option<usize>>
where Self : Sized
{
// Reads the length of the slice supplied by buf.mut_bytes into the buffer
// This is not guaranteed to consume an entire datagram or segment.
// If your protocol is msg based (instead of continuous stream) you should
// ensure that your buffer is large enough to hold an entire segment (1532 bytes if not jumbo
// frames)
let res = self.try_read(unsafe { buf.mut_bytes() });
if let Ok(Some(cnt)) = res {
unsafe { buf.advance(cnt); }
}
res
}
fn try_read(&mut self, buf: &mut [u8]) -> io::Result<Option<usize>>;
}
pub trait TryWrite {
fn try_write_buf<B: Buf>(&mut self, buf: &mut B) -> io::Result<Option<usize>>
where Self : Sized
{
let res = self.try_write(buf.bytes());
if let Ok(Some(cnt)) = res {
buf.advance(cnt);
}
res
}
fn try_write(&mut self, buf: &[u8]) -> io::Result<Option<usize>>;
}
impl<T: Read> TryRead for T {
fn try_read(&mut self, dst: &mut [u8]) -> io::Result<Option<usize>> {
self.read(dst).map_non_block()
}
}
impl<T: Write> TryWrite for T {
fn try_write(&mut self, src: &[u8]) -> io::Result<Option<usize>> {
self.write(src).map_non_block()
}
}
/*
*
* ===== Helpers =====
*
*/
/// A helper trait to provide the map_non_block function on Results.
trait MapNonBlock<T> {
/// Maps a `Result<T>` to a `Result<Option<T>>` by converting
/// operation-would-block errors into `Ok(None)`.
fn map_non_block(self) -> io::Result<Option<T>>;
}
impl<T> MapNonBlock<T> for io::Result<T> {
fn map_non_block(self) -> io::Result<Option<T>> {
use std::io::ErrorKind::WouldBlock;
match self {
Ok(value) => Ok(Some(value)),
Err(err) => {
if let WouldBlock = err.kind() {
Ok(None)
} else {
Err(err)
}
}
}
}
}
mod ports {
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::SeqCst;
// Helper for getting a unique port for the task run
// TODO: Reuse ports to not spam the system
static mut NEXT_PORT: AtomicUsize = ATOMIC_USIZE_INIT;
const FIRST_PORT: usize = 18080;
fn next_port() -> usize {
unsafe {
// If the atomic was never used, set it to the initial port
NEXT_PORT.compare_and_swap(0, FIRST_PORT, SeqCst);
// Get and increment the port list
NEXT_PORT.fetch_add(1, SeqCst)
}
}
pub fn localhost() -> SocketAddr {
let s = format!("127.0.0.1:{}", next_port());
FromStr::from_str(&s).unwrap()
}
}
pub fn sleep_ms(ms: u64) {
use std::thread;
thread::sleep(Duration::from_millis(ms));
}
pub fn expect_events(poll: &Poll,
event_buffer: &mut Events,
poll_try_count: usize,
mut expected: Vec<Event>)
{
const MS: u64 = 1_000;
for _ in 0..poll_try_count {
poll.poll(event_buffer, Some(Duration::from_millis(MS))).unwrap();
for event in event_buffer.iter() {
let pos_opt = match expected.iter().position(|exp_event| {
(event.token() == exp_event.token()) &&
event.readiness().contains(exp_event.readiness())
}) {
Some(x) => Some(x),
None => None,
};
if let Some(pos) = pos_opt { expected.remove(pos); }
}
if expected.is_empty() {
break;
}
}
assert!(expected.is_empty(), "The following expected events were not found: {:?}", expected);
}
|