summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-0.1.11/tests/line-frames.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-0.1.11/tests/line-frames.rs')
-rw-r--r--third_party/rust/tokio-0.1.11/tests/line-frames.rs88
1 files changed, 88 insertions, 0 deletions
diff --git a/third_party/rust/tokio-0.1.11/tests/line-frames.rs b/third_party/rust/tokio-0.1.11/tests/line-frames.rs
new file mode 100644
index 0000000000..e36d5a73ee
--- /dev/null
+++ b/third_party/rust/tokio-0.1.11/tests/line-frames.rs
@@ -0,0 +1,88 @@
+extern crate env_logger;
+extern crate futures;
+extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate tokio_threadpool;
+extern crate bytes;
+
+use std::io;
+use std::net::Shutdown;
+
+use bytes::{BytesMut, BufMut};
+use futures::{Future, Stream, Sink};
+use tokio::net::{TcpListener, TcpStream};
+use tokio_codec::{Encoder, Decoder};
+use tokio_io::io::{write_all, read};
+use tokio_threadpool::Builder;
+
+pub struct LineCodec;
+
+impl Decoder for LineCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ match buf.iter().position(|&b| b == b'\n') {
+ Some(i) => Ok(Some(buf.split_to(i + 1).into())),
+ None => Ok(None),
+ }
+ }
+
+ fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ if buf.len() == 0 {
+ Ok(None)
+ } else {
+ let amt = buf.len();
+ Ok(Some(buf.split_to(amt)))
+ }
+ }
+}
+
+impl Encoder for LineCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: BytesMut, into: &mut BytesMut) -> io::Result<()> {
+ into.put(&item[..]);
+ Ok(())
+ }
+}
+
+#[test]
+fn echo() {
+ drop(env_logger::try_init());
+
+ let pool = Builder::new()
+ .pool_size(1)
+ .build();
+
+ let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
+ let addr = listener.local_addr().unwrap();
+ let sender = pool.sender().clone();
+ let srv = listener.incoming().for_each(move |socket| {
+ let (sink, stream) = LineCodec.framed(socket).split();
+ sender.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap();
+ Ok(())
+ });
+
+ pool.sender().spawn(srv.map_err(|e| panic!("srv error: {}", e))).unwrap();
+
+ let client = TcpStream::connect(&addr);
+ let client = client.wait().unwrap();
+ let (client, _) = write_all(client, b"a\n").wait().unwrap();
+ let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap();
+ assert_eq!(amt, 2);
+ assert_eq!(&buf[..2], b"a\n");
+
+ let (client, _) = write_all(client, b"\n").wait().unwrap();
+ let (client, buf, amt) = read(client, buf).wait().unwrap();
+ assert_eq!(amt, 1);
+ assert_eq!(&buf[..1], b"\n");
+
+ let (client, _) = write_all(client, b"b").wait().unwrap();
+ client.shutdown(Shutdown::Write).unwrap();
+ let (_client, buf, amt) = read(client, buf).wait().unwrap();
+ assert_eq!(amt, 1);
+ assert_eq!(&buf[..1], b"b");
+}