diff options
Diffstat (limited to 'third_party/rust/tokio-0.1.11/tests/line-frames.rs')
-rw-r--r-- | third_party/rust/tokio-0.1.11/tests/line-frames.rs | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/tests/line-frames.rs b/third_party/rust/tokio-0.1.11/tests/line-frames.rs new file mode 100644 index 0000000000..e36d5a73ee --- /dev/null +++ b/third_party/rust/tokio-0.1.11/tests/line-frames.rs @@ -0,0 +1,88 @@ +extern crate env_logger; +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_threadpool; +extern crate bytes; + +use std::io; +use std::net::Shutdown; + +use bytes::{BytesMut, BufMut}; +use futures::{Future, Stream, Sink}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_codec::{Encoder, Decoder}; +use tokio_io::io::{write_all, read}; +use tokio_threadpool::Builder; + +pub struct LineCodec; + +impl Decoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + match buf.iter().position(|&b| b == b'\n') { + Some(i) => Ok(Some(buf.split_to(i + 1).into())), + None => Ok(None), + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() == 0 { + Ok(None) + } else { + let amt = buf.len(); + Ok(Some(buf.split_to(amt))) + } + } +} + +impl Encoder for LineCodec { + type Item = BytesMut; + type Error = io::Error; + + fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> { + into.put(&item[..]); + Ok(()) + } +} + +#[test] +fn echo() { + drop(env_logger::try_init()); + + let pool = Builder::new() + .pool_size(1) + .build(); + + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let sender = pool.sender().clone(); + let srv = listener.incoming().for_each(move |socket| { + let (sink, stream) = LineCodec.framed(socket).split(); + sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap(); + Ok(()) + }); + + pool.sender().spawn(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); + + let client = TcpStream::connect(&addr); + let client = client.wait().unwrap(); + let (client, _) = write_all(client, b"a\n").wait().unwrap(); + let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + assert_eq!(amt, 2); + assert_eq!(&buf[..2], b"a\n"); + + let (client, _) = write_all(client, b"\n").wait().unwrap(); + let (client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"\n"); + + let (client, _) = write_all(client, b"b").wait().unwrap(); + client.shutdown(Shutdown::Write).unwrap(); + let (_client, buf, amt) = read(client, buf).wait().unwrap(); + assert_eq!(amt, 1); + assert_eq!(&buf[..1], b"b"); +} |