1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
extern crate env_logger;
extern crate futures;
extern crate tokio;
extern crate tokio_codec;
extern crate tokio_io;
extern crate tokio_threadpool;
extern crate bytes;
use std::io;
use std::net::Shutdown;
use bytes::{BytesMut, BufMut};
use futures::{Future, Stream, Sink};
use tokio::net::{TcpListener, TcpStream};
use tokio_codec::{Encoder, Decoder};
use tokio_io::io::{write_all, read};
use tokio_threadpool::Builder;
pub struct LineCodec;
impl Decoder for LineCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
match buf.iter().position(|&b| b == b'\n') {
Some(i) => Ok(Some(buf.split_to(i + 1).into())),
None => Ok(None),
}
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
if buf.len() == 0 {
Ok(None)
} else {
let amt = buf.len();
Ok(Some(buf.split_to(amt)))
}
}
}
impl Encoder for LineCodec {
type Item = BytesMut;
type Error = io::Error;
fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> {
into.put(&item[..]);
Ok(())
}
}
#[test]
fn echo() {
drop(env_logger::try_init());
let pool = Builder::new()
.pool_size(1)
.build();
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let sender = pool.sender().clone();
let srv = listener.incoming().for_each(move |socket| {
let (sink, stream) = LineCodec.framed(socket).split();
sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap();
Ok(())
});
pool.sender().spawn(srv.map_err(|e| panic!("srv error: {}", e))).unwrap();
let client = TcpStream::connect(&addr);
let client = client.wait().unwrap();
let (client, _) = write_all(client, b"a\n").wait().unwrap();
let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap();
assert_eq!(amt, 2);
assert_eq!(&buf[..2], b"a\n");
let (client, _) = write_all(client, b"\n").wait().unwrap();
let (client, buf, amt) = read(client, buf).wait().unwrap();
assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"\n");
let (client, _) = write_all(client, b"b").wait().unwrap();
client.shutdown(Shutdown::Write).unwrap();
let (_client, buf, amt) = read(client, buf).wait().unwrap();
assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"b");
}
|