diff options
Diffstat (limited to '')
-rw-r--r-- | third_party/rust/tokio-uds/tests/datagram.rs | 83 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/tests/stream.rs | 55 |
2 files changed, 138 insertions, 0 deletions
diff --git a/third_party/rust/tokio-uds/tests/datagram.rs b/third_party/rust/tokio-uds/tests/datagram.rs new file mode 100644 index 0000000000..6f58f7c077 --- /dev/null +++ b/third_party/rust/tokio-uds/tests/datagram.rs @@ -0,0 +1,83 @@ +#![cfg(unix)] + +extern crate bytes; +extern crate futures; +extern crate tempfile; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_uds; + +use tokio_uds::*; + +use std::str; + +use bytes::BytesMut; + +use tokio::io; +use tokio::runtime::current_thread::Runtime; + +use tokio_codec::{Decoder, Encoder}; + +use futures::{Future, Sink, Stream}; + +struct StringDatagramCodec; + +/// A codec to decode datagrams from a unix domain socket as utf-8 text messages. +impl Encoder for StringDatagramCodec { + type Item = String; + type Error = io::Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + dst.extend_from_slice(&item.into_bytes()); + Ok(()) + } +} + +/// A codec to decode datagrams from a unix domain socket as utf-8 text messages. +impl Decoder for StringDatagramCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { + let decoded = str::from_utf8(buf) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + .to_string(); + + Ok(Some(decoded)) + } +} + +#[test] +fn framed_echo() { + let dir = tempfile::tempdir().unwrap(); + let server_path = dir.path().join("server.sock"); + let client_path = dir.path().join("client.sock"); + + let mut rt = Runtime::new().unwrap(); + + { + let socket = UnixDatagram::bind(&server_path).unwrap(); + let server = UnixDatagramFramed::new(socket, StringDatagramCodec); + + let (sink, stream) = server.split(); + + let echo_stream = stream.map(|(msg, addr)| (msg, addr.as_pathname().unwrap().to_path_buf())); + + // spawn echo server + rt.spawn(echo_stream.forward(sink) + .map_err(|e| panic!("err={:?}", e)) + .map(|_| ())); + } + + { + let socket = UnixDatagram::bind(&client_path).unwrap(); + let client = UnixDatagramFramed::new(socket, StringDatagramCodec); + + let (sink, stream) = client.split(); + + rt.block_on(sink.send(("ECHO".to_string(), server_path))).unwrap(); + + let response = rt.block_on(stream.take(1).collect()).unwrap(); + assert_eq!(response[0].0, "ECHO"); + } +} diff --git a/third_party/rust/tokio-uds/tests/stream.rs b/third_party/rust/tokio-uds/tests/stream.rs new file mode 100644 index 0000000000..ebb1835243 --- /dev/null +++ b/third_party/rust/tokio-uds/tests/stream.rs @@ -0,0 +1,55 @@ +#![cfg(unix)] + +extern crate futures; +extern crate tokio; +extern crate tokio_uds; + +extern crate tempfile; + +use tokio_uds::*; + +use tokio::io; +use tokio::runtime::current_thread::Runtime; + +use futures::{Future, Stream}; +use futures::sync::oneshot; +use tempfile::Builder; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn echo() { + let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let mut rt = Runtime::new().unwrap(); + + let server = t!(UnixListener::bind(&sock_path)); + let (tx, rx) = oneshot::channel(); + + rt.spawn({ + server.incoming() + .into_future() + .and_then(move |(sock, _)| { + tx.send(sock.unwrap()).unwrap(); + Ok(()) + }) + .map_err(|e| panic!("err={:?}", e)) + }); + + let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap(); + let server = rt.block_on(rx).unwrap(); + + // Write to the client + rt.block_on(io::write_all(client, b"hello")).unwrap(); + + // Read from the server + let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap(); + + assert_eq!(buf, b"hello"); +} |