1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#![feature(test)]
#![deny(warnings)]
extern crate test;
#[macro_use]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use std::thread;
use futures::sync::oneshot;
use futures::sync::mpsc;
use futures::{Future, Poll, Sink, Stream};
use test::Bencher;
use tokio::net::UdpSocket;
/// UDP echo server
struct EchoServer {
socket: UdpSocket,
buf: Vec<u8>,
to_send: Option<(usize, SocketAddr)>,
}
impl EchoServer {
fn new(s: UdpSocket) -> Self {
EchoServer {
socket: s,
to_send: None,
buf: vec![0u8; 1600],
}
}
}
impl Future for EchoServer {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
loop {
if let Some(&(size, peer)) = self.to_send.as_ref() {
try_ready!(self.socket.poll_send_to(&self.buf[..size], &peer));
self.to_send = None;
}
self.to_send = Some(try_ready!(self.socket.poll_recv_from(&mut self.buf)));
}
}
}
#[bench]
fn udp_echo_latency(b: &mut Bencher) {
let any_addr = "127.0.0.1:0".to_string();
let any_addr = any_addr.parse::<SocketAddr>().unwrap();
let (stop_c, stop_p) = oneshot::channel::<()>();
let (tx, rx) = oneshot::channel();
let child = thread::spawn(move || {
let socket = tokio::net::UdpSocket::bind(&any_addr).unwrap();
tx.send(socket.local_addr().unwrap()).unwrap();
let server = EchoServer::new(socket);
let server = server.select(stop_p.map_err(|_| panic!()));
let server = server.map_err(|_| ());
server.wait().unwrap();
});
let client = std::net::UdpSocket::bind(&any_addr).unwrap();
let server_addr = rx.wait().unwrap();
let mut buf = [0u8; 1000];
// warmup phase; for some reason initial couple of
// runs are much slower
//
// TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
for _ in 0..8 {
client.send_to(&buf, &server_addr).unwrap();
let _ = client.recv_from(&mut buf).unwrap();
}
b.iter(|| {
client.send_to(&buf, &server_addr).unwrap();
let _ = client.recv_from(&mut buf).unwrap();
});
stop_c.send(()).unwrap();
child.join().unwrap();
}
#[bench]
fn futures_channel_latency(b: &mut Bencher) {
let (mut in_tx, in_rx) = mpsc::channel(32);
let (out_tx, out_rx) = mpsc::channel::<_>(32);
let child = thread::spawn(|| out_tx.send_all(in_rx.then(|r| r.unwrap())).wait());
let mut rx_iter = out_rx.wait();
// warmup phase; for some reason initial couple of runs are much slower
//
// TODO: Describe the exact reasons; caching? branch predictor? lazy closures?
for _ in 0..8 {
in_tx.start_send(Ok(1usize)).unwrap();
let _ = rx_iter.next();
}
b.iter(|| {
in_tx.start_send(Ok(1usize)).unwrap();
let _ = rx_iter.next();
});
drop(in_tx);
child.join().unwrap().unwrap();
}
|