summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-uds/tests
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/tokio-uds/tests/datagram.rs83
-rw-r--r--third_party/rust/tokio-uds/tests/stream.rs55
2 files changed, 138 insertions, 0 deletions
diff --git a/third_party/rust/tokio-uds/tests/datagram.rs b/third_party/rust/tokio-uds/tests/datagram.rs
new file mode 100644
index 0000000000..6f58f7c077
--- /dev/null
+++ b/third_party/rust/tokio-uds/tests/datagram.rs
@@ -0,0 +1,83 @@
+#![cfg(unix)]
+
+extern crate bytes;
+extern crate futures;
+extern crate tempfile;
+extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_uds;
+
+use tokio_uds::*;
+
+use std::str;
+
+use bytes::BytesMut;
+
+use tokio::io;
+use tokio::runtime::current_thread::Runtime;
+
+use tokio_codec::{Decoder, Encoder};
+
+use futures::{Future, Sink, Stream};
+
+struct StringDatagramCodec;
+
+/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
+impl Encoder for StringDatagramCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ dst.extend_from_slice(&item.into_bytes());
+ Ok(())
+ }
+}
+
+/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
+impl Decoder for StringDatagramCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ let decoded = str::from_utf8(buf)
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
+ .to_string();
+
+ Ok(Some(decoded))
+ }
+}
+
+#[test]
+fn framed_echo() {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ let mut rt = Runtime::new().unwrap();
+
+ {
+ let socket = UnixDatagram::bind(&server_path).unwrap();
+ let server = UnixDatagramFramed::new(socket, StringDatagramCodec);
+
+ let (sink, stream) = server.split();
+
+ let echo_stream = stream.map(|(msg, addr)| (msg, addr.as_pathname().unwrap().to_path_buf()));
+
+ // spawn echo server
+ rt.spawn(echo_stream.forward(sink)
+ .map_err(|e| panic!("err={:?}", e))
+ .map(|_| ()));
+ }
+
+ {
+ let socket = UnixDatagram::bind(&client_path).unwrap();
+ let client = UnixDatagramFramed::new(socket, StringDatagramCodec);
+
+ let (sink, stream) = client.split();
+
+ rt.block_on(sink.send(("ECHO".to_string(), server_path))).unwrap();
+
+ let response = rt.block_on(stream.take(1).collect()).unwrap();
+ assert_eq!(response[0].0, "ECHO");
+ }
+}
diff --git a/third_party/rust/tokio-uds/tests/stream.rs b/third_party/rust/tokio-uds/tests/stream.rs
new file mode 100644
index 0000000000..ebb1835243
--- /dev/null
+++ b/third_party/rust/tokio-uds/tests/stream.rs
@@ -0,0 +1,55 @@
+#![cfg(unix)]
+
+extern crate futures;
+extern crate tokio;
+extern crate tokio_uds;
+
+extern crate tempfile;
+
+use tokio_uds::*;
+
+use tokio::io;
+use tokio::runtime::current_thread::Runtime;
+
+use futures::{Future, Stream};
+use futures::sync::oneshot;
+use tempfile::Builder;
+
+macro_rules! t {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ })
+}
+
+#[test]
+fn echo() {
+ let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap();
+ let sock_path = dir.path().join("connect.sock");
+
+ let mut rt = Runtime::new().unwrap();
+
+ let server = t!(UnixListener::bind(&sock_path));
+ let (tx, rx) = oneshot::channel();
+
+ rt.spawn({
+ server.incoming()
+ .into_future()
+ .and_then(move |(sock, _)| {
+ tx.send(sock.unwrap()).unwrap();
+ Ok(())
+ })
+ .map_err(|e| panic!("err={:?}", e))
+ });
+
+ let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap();
+ let server = rt.block_on(rx).unwrap();
+
+ // Write to the client
+ rt.block_on(io::write_all(client, b"hello")).unwrap();
+
+ // Read from the server
+ let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap();
+
+ assert_eq!(buf, b"hello");
+}