summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-uds/src
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-uds/src')
-rw-r--r--third_party/rust/tokio-uds/src/datagram.rs209
-rw-r--r--third_party/rust/tokio-uds/src/frame.rs160
-rw-r--r--third_party/rust/tokio-uds/src/incoming.rs27
-rw-r--r--third_party/rust/tokio-uds/src/lib.rs38
-rw-r--r--third_party/rust/tokio-uds/src/listener.rs146
-rw-r--r--third_party/rust/tokio-uds/src/recv_dgram.rs82
-rw-r--r--third_party/rust/tokio-uds/src/send_dgram.rs81
-rw-r--r--third_party/rust/tokio-uds/src/stream.rs356
-rw-r--r--third_party/rust/tokio-uds/src/ucred.rs159
9 files changed, 1258 insertions, 0 deletions
diff --git a/third_party/rust/tokio-uds/src/datagram.rs b/third_party/rust/tokio-uds/src/datagram.rs
new file mode 100644
index 0000000000..da3f751a9f
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/datagram.rs
@@ -0,0 +1,209 @@
+use {RecvDgram, SendDgram};
+
+use tokio_reactor::{Handle, PollEvented};
+
+use futures::{Async, Poll};
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io;
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// An I/O object representing a Unix datagram socket.
+pub struct UnixDatagram {
+ io: PollEvented<mio_uds::UnixDatagram>,
+}
+
+impl UnixDatagram {
+ /// Creates a new `UnixDatagram` bound to the specified path.
+ pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
+ where
+ P: AsRef<Path>,
+ {
+ let socket = mio_uds::UnixDatagram::bind(path)?;
+ Ok(UnixDatagram::new(socket))
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another. Each socket will
+ /// be associated with the default event loop's handle.
+ pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
+ let (a, b) = mio_uds::UnixDatagram::pair()?;
+ let a = UnixDatagram::new(a);
+ let b = UnixDatagram::new(b);
+
+ Ok((a, b))
+ }
+
+ /// Consumes a `UnixDatagram` in the standard library and returns a
+ /// nonblocking `UnixDatagram` from this crate.
+ ///
+ /// The returned datagram will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result<UnixDatagram> {
+ let socket = mio_uds::UnixDatagram::from_datagram(datagram)?;
+ let io = PollEvented::new_with_handle(socket, handle)?;
+ Ok(UnixDatagram { io })
+ }
+
+ fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
+ let io = PollEvented::new(socket);
+ UnixDatagram { io }
+ }
+
+ /// Creates a new `UnixDatagram` which is not bound to any address.
+ pub fn unbound() -> io::Result<UnixDatagram> {
+ let socket = mio_uds::UnixDatagram::unbound()?;
+ Ok(UnixDatagram::new(socket))
+ }
+
+ /// Connects the socket to the specified address.
+ ///
+ /// The `send` method may be used to send data to the specified address.
+ /// `recv` and `recv_from` will only receive data from that address.
+ pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
+ self.io.get_ref().connect(path)
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Test whether this socket is ready to be written to or not.
+ pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
+ self.io.poll_write_ready()
+ }
+
+ /// Returns the local address that this socket is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Returns the address of this socket's peer.
+ ///
+ /// The `connect` method will connect the socket to a peer.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().peer_addr()
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// On success, returns the number of bytes read and the address from
+ /// whence the data came.
+ pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().recv_from(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// On success, returns the number of bytes read.
+ pub fn poll_recv(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().recv(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details.
+ pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
+ where
+ T: AsMut<[u8]>,
+ {
+ RecvDgram::new(self, buf)
+ }
+
+ /// Sends data on the socket to the specified address.
+ ///
+ /// On success, returns the number of bytes written.
+ pub fn poll_send_to<P>(&self, buf: &[u8], path: P) -> Poll<usize, io::Error>
+ where
+ P: AsRef<Path>,
+ {
+ try_ready!(self.io.poll_write_ready());
+
+ match self.io.get_ref().send_to(buf, path) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ ///
+ /// The peer address may be set by the `connect` method, and this method
+ /// will return an error if the socket has not already been connected.
+ ///
+ /// On success, returns the number of bytes written.
+ pub fn poll_send(&self, buf: &[u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_write_ready());
+
+ match self.io.get_ref().send(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Returns a future sending the data in buf to the socket at path.
+ pub fn send_dgram<T, P>(self, buf: T, path: P) -> SendDgram<T, P>
+ where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+ {
+ SendDgram::new(self, buf, path)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Shut down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.get_ref().shutdown(how)
+ }
+}
+
+impl fmt::Debug for UnixDatagram {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixDatagram {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/frame.rs b/third_party/rust/tokio-uds/src/frame.rs
new file mode 100644
index 0000000000..49c8948c64
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/frame.rs
@@ -0,0 +1,160 @@
+use std::io;
+use std::os::unix::net::SocketAddr;
+use std::path::Path;
+
+use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
+
+use super::UnixDatagram;
+
+use tokio_codec::{Decoder, Encoder};
+use bytes::{BytesMut, BufMut};
+
+/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// Unix datagram sockets work with datagrams, but higher-level code may wants to
+/// batch these into meaningful chunks, called "frames". This method layers
+/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
+/// handle encoding and decoding of messages frames. Note that the incoming and
+/// outgoing frame types may be distinct.
+///
+/// This function returns a *single* object that is both `Stream` and `Sink`;
+/// grouping this into a single object is often useful for layering things which
+/// require both read and write access to the underlying object.
+///
+/// If you want to work more directly with the streams and sink, consider
+/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break
+/// them into separate objects, allowing them to interact more easily.
+#[must_use = "sinks do nothing unless polled"]
+#[derive(Debug)]
+pub struct UnixDatagramFramed<A, C> {
+ socket: UnixDatagram,
+ codec: C,
+ rd: BytesMut,
+ wr: BytesMut,
+ out_addr: Option<A>,
+ flushed: bool,
+}
+
+impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
+ type Item = (C::Item, SocketAddr);
+ type Error = C::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.rd.reserve(INITIAL_RD_CAPACITY);
+
+ let (n, addr) = unsafe {
+ let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
+ self.rd.advance_mut(n);
+ (n, addr)
+ };
+ trace!("received {} bytes, decoding", n);
+ let frame_res = self.codec.decode(&mut self.rd);
+ self.rd.clear();
+ let frame = frame_res?;
+ let result = frame.map(|frame| (frame, addr));
+ trace!("frame decoded from buffer");
+ Ok(Async::Ready(result))
+ }
+}
+
+impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
+ type SinkItem = (C::Item, A);
+ type SinkError = C::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ trace!("sending frame");
+
+ if !self.flushed {
+ match try!(self.poll_complete()) {
+ Async::Ready(()) => {},
+ Async::NotReady => return Ok(AsyncSink::NotReady(item)),
+ }
+ }
+
+ let (frame, out_addr) = item;
+ self.codec.encode(frame, &mut self.wr)?;
+ self.out_addr = Some(out_addr);
+ self.flushed = false;
+ trace!("frame encoded; length={}", self.wr.len());
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), C::Error> {
+ if self.flushed {
+ return Ok(Async::Ready(()))
+ }
+
+ let n = {
+ let out_path = match self.out_addr {
+ Some(ref out_path) => out_path.as_ref(),
+ None => return Err(io::Error::new(io::ErrorKind::Other,
+ "internal error: addr not available while data not flushed").into()),
+ };
+
+ trace!("flushing frame; length={}", self.wr.len());
+ try_ready!(self.socket.poll_send_to(&self.wr, out_path))
+ };
+
+ trace!("written {}", n);
+
+ let wrote_all = n == self.wr.len();
+ self.wr.clear();
+ self.flushed = true;
+
+ if wrote_all {
+ self.out_addr = None;
+ Ok(Async::Ready(()))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "failed to write entire datagram to socket").into())
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), C::Error> {
+ self.poll_complete()
+ }
+}
+
+const INITIAL_RD_CAPACITY: usize = 64 * 1024;
+const INITIAL_WR_CAPACITY: usize = 8 * 1024;
+
+impl<A, C> UnixDatagramFramed<A, C> {
+ /// Create a new `UnixDatagramFramed` backed by the given socket and codec.
+ ///
+ /// See struct level documentation for more details.
+ pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
+ UnixDatagramFramed {
+ socket: socket,
+ codec: codec,
+ out_addr: None,
+ rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
+ wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
+ flushed: true,
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
+ ///
+ /// # Note
+ ///
+ /// Care should be taken to not tamper with the underlying stream of data
+ /// coming in as it may corrupt the stream of frames otherwise being worked
+ /// with.
+ pub fn get_ref(&self) -> &UnixDatagram {
+ &self.socket
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// # Note
+ ///
+ /// Care should be taken to not tamper with the underlying stream of data
+ /// coming in as it may corrupt the stream of frames otherwise being worked
+ /// with.
+ pub fn get_mut(&mut self) -> &mut UnixDatagram {
+ &mut self.socket
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/incoming.rs b/third_party/rust/tokio-uds/src/incoming.rs
new file mode 100644
index 0000000000..28d4d76837
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/incoming.rs
@@ -0,0 +1,27 @@
+use {UnixListener, UnixStream};
+
+use futures::{Stream, Poll};
+
+use std::io;
+
+/// Stream of listeners
+#[derive(Debug)]
+pub struct Incoming {
+ inner: UnixListener,
+}
+
+impl Incoming {
+ pub(crate) fn new(listener: UnixListener) -> Incoming {
+ Incoming { inner: listener }
+ }
+}
+
+impl Stream for Incoming {
+ type Item = UnixStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
+ Ok(Some(try_ready!(self.inner.poll_accept()).0).into())
+ }
+}
+
diff --git a/third_party/rust/tokio-uds/src/lib.rs b/third_party/rust/tokio-uds/src/lib.rs
new file mode 100644
index 0000000000..4c02185a8e
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/lib.rs
@@ -0,0 +1,38 @@
+#![cfg(unix)]
+#![doc(html_root_url = "https://docs.rs/tokio-uds/0.2.5")]
+#![deny(missing_docs, warnings, missing_debug_implementations)]
+
+//! Unix Domain Sockets for Tokio.
+//!
+//! This crate provides APIs for using Unix Domain Sockets with Tokio.
+
+extern crate bytes;
+#[macro_use]
+extern crate futures;
+extern crate iovec;
+extern crate libc;
+#[macro_use]
+extern crate log;
+extern crate mio;
+extern crate mio_uds;
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate tokio_reactor;
+
+mod datagram;
+mod frame;
+mod incoming;
+mod listener;
+mod recv_dgram;
+mod send_dgram;
+mod stream;
+mod ucred;
+
+pub use datagram::UnixDatagram;
+pub use frame::UnixDatagramFramed;
+pub use incoming::Incoming;
+pub use listener::UnixListener;
+pub use recv_dgram::RecvDgram;
+pub use send_dgram::SendDgram;
+pub use stream::{UnixStream, ConnectFuture};
+pub use ucred::UCred;
diff --git a/third_party/rust/tokio-uds/src/listener.rs b/third_party/rust/tokio-uds/src/listener.rs
new file mode 100644
index 0000000000..c63b4a8432
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/listener.rs
@@ -0,0 +1,146 @@
+use {Incoming, UnixStream};
+
+use tokio_reactor::{Handle, PollEvented};
+
+use futures::{Async, Poll};
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// A Unix socket which can accept connections from other Unix sockets.
+pub struct UnixListener {
+ io: PollEvented<mio_uds::UnixListener>,
+}
+
+impl UnixListener {
+ /// Creates a new `UnixListener` bound to the specified path.
+ pub fn bind<P>(path: P) -> io::Result<UnixListener>
+ where
+ P: AsRef<Path>,
+ {
+ let listener = mio_uds::UnixListener::bind(path)?;
+ let io = PollEvented::new(listener);
+ Ok(UnixListener { io })
+ }
+
+ /// Consumes a `UnixListener` in the standard library and returns a
+ /// nonblocking `UnixListener` from this crate.
+ ///
+ /// The returned listener will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result<UnixListener> {
+ let listener = mio_uds::UnixListener::from_listener(listener)?;
+ let io = PollEvented::new_with_handle(listener, handle)?;
+ Ok(UnixListener { io })
+ }
+
+ /// Returns the local socket address of this listener.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Attempt to accept a connection and create a new connected `UnixStream`
+ /// if successful.
+ ///
+ /// This function will attempt an accept operation, but will not block
+ /// waiting for it to complete. If the operation would block then a "would
+ /// block" error is returned. Additionally, if this method would block, it
+ /// registers the current task to receive a notification when it would
+ /// otherwise not block.
+ ///
+ /// Note that typically for simple usage it's easier to treat incoming
+ /// connections as a `Stream` of `UnixStream`s with the `incoming` method
+ /// below.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if it is called outside the context of a
+ /// future's task. It's recommended to only call this from the
+ /// implementation of a `Future::poll`, if necessary.
+ pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> {
+ let (io, addr) = try_ready!(self.poll_accept_std());
+
+ let io = mio_uds::UnixStream::from_stream(io)?;
+ Ok((UnixStream::new(io), addr).into())
+ }
+
+ /// Attempt to accept a connection and create a new connected `UnixStream`
+ /// if successful.
+ ///
+ /// This function is the same as `poll_accept` above except that it returns a
+ /// `mio_uds::UnixStream` instead of a `tokio_udp::UnixStream`. This in turn
+ /// can then allow for the stream to be associated with a different reactor
+ /// than the one this `UnixListener` is associated with.
+ ///
+ /// This function will attempt an accept operation, but will not block
+ /// waiting for it to complete. If the operation would block then a "would
+ /// block" error is returned. Additionally, if this method would block, it
+ /// registers the current task to receive a notification when it would
+ /// otherwise not block.
+ ///
+ /// Note that typically for simple usage it's easier to treat incoming
+ /// connections as a `Stream` of `UnixStream`s with the `incoming` method
+ /// below.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if it is called outside the context of a
+ /// future's task. It's recommended to only call this from the
+ /// implementation of a `Future::poll`, if necessary.
+ pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> {
+ loop {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().accept_std() {
+ Ok(None) => {
+ self.io.clear_read_ready(Ready::readable())?;
+ return Ok(Async::NotReady);
+ }
+ Ok(Some((sock, addr))) => {
+ return Ok(Async::Ready((sock, addr)));
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ return Ok(Async::NotReady);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+ }
+
+ /// Consumes this listener, returning a stream of the sockets this listener
+ /// accepts.
+ ///
+ /// This method returns an implementation of the `Stream` trait which
+ /// resolves to the sockets the are accepted on this listener.
+ pub fn incoming(self) -> Incoming {
+ Incoming::new(self)
+ }
+}
+
+impl fmt::Debug for UnixListener {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/recv_dgram.rs b/third_party/rust/tokio-uds/src/recv_dgram.rs
new file mode 100644
index 0000000000..390202f38b
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/recv_dgram.rs
@@ -0,0 +1,82 @@
+use UnixDatagram;
+
+use futures::{Async, Future, Poll};
+
+use std::io;
+use std::mem;
+
+/// A future for receiving datagrams from a Unix datagram socket.
+///
+/// An example that uses UDP sockets but is still applicable can be found at
+/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5.
+#[derive(Debug)]
+pub struct RecvDgram<T> {
+ st: State<T>,
+}
+
+/// A future similar to RecvDgram, but without allocating and returning the peer's address.
+///
+/// This can be used if the peer's address is of no interest, so the allocation overhead can be
+/// avoided.
+#[derive(Debug)]
+enum State<T> {
+ Receiving {
+ sock: UnixDatagram,
+ buf: T,
+ },
+ Empty,
+}
+
+impl<T> RecvDgram<T>
+where
+ T: AsMut<[u8]>
+{
+ pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram<T> {
+ RecvDgram {
+ st: State::Receiving {
+ sock,
+ buf,
+ },
+ }
+ }
+}
+
+impl<T> Future for RecvDgram<T>
+where
+ T: AsMut<[u8]>,
+{
+ /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were
+ /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the
+ /// datagram is truncated.
+ type Item = (UnixDatagram, T, usize, String);
+ /// This future yields io::Error if an error occurred.
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let received;
+ let peer;
+
+ if let State::Receiving {
+ ref mut sock,
+ ref mut buf,
+ } = self.st
+ {
+ let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut()));
+ received = n;
+
+ peer = p.as_pathname().map_or(String::new(), |p| {
+ p.to_str().map_or(String::new(), |s| s.to_string())
+ });
+ } else {
+ panic!()
+ }
+
+ if let State::Receiving { sock, buf } =
+ mem::replace(&mut self.st, State::Empty)
+ {
+ Ok(Async::Ready((sock, buf, received, peer)))
+ } else {
+ panic!()
+ }
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/send_dgram.rs b/third_party/rust/tokio-uds/src/send_dgram.rs
new file mode 100644
index 0000000000..59d438b761
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/send_dgram.rs
@@ -0,0 +1,81 @@
+use UnixDatagram;
+
+use futures::{Async, Future, Poll};
+
+use std::io;
+use std::mem;
+use std::path::Path;
+
+/// A future for writing a buffer to a Unix datagram socket.
+#[derive(Debug)]
+pub struct SendDgram<T, P> {
+ st: State<T, P>,
+}
+
+#[derive(Debug)]
+enum State<T, P> {
+ /// current state is Sending
+ Sending {
+ /// the underlying socket
+ sock: UnixDatagram,
+ /// the buffer to send
+ buf: T,
+ /// the destination
+ addr: P,
+ },
+ /// neutral state
+ Empty,
+}
+
+impl<T, P> SendDgram<T, P>
+where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+{
+ pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram<T, P> {
+ SendDgram {
+ st: State::Sending {
+ sock,
+ buf,
+ addr,
+ }
+ }
+ }
+}
+
+impl<T, P> Future for SendDgram<T, P>
+where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+{
+ /// Returns the underlying socket and the buffer that was sent.
+ type Item = (UnixDatagram, T);
+ /// The error that is returned when sending failed.
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if let State::Sending {
+ ref mut sock,
+ ref buf,
+ ref addr,
+ } = self.st
+ {
+ let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr));
+ if n < buf.as_ref().len() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Couldn't send whole buffer".to_string(),
+ ));
+ }
+ } else {
+ panic!()
+ }
+ if let State::Sending { sock, buf, addr: _ } =
+ mem::replace(&mut self.st, State::Empty)
+ {
+ Ok(Async::Ready((sock, buf)))
+ } else {
+ panic!()
+ }
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/stream.rs b/third_party/rust/tokio-uds/src/stream.rs
new file mode 100644
index 0000000000..7098c85fe1
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/stream.rs
@@ -0,0 +1,356 @@
+use ucred::{self, UCred};
+
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_reactor::{Handle, PollEvented};
+
+use bytes::{Buf, BufMut};
+use futures::{Async, Future, Poll};
+use iovec::{self, IoVec};
+use libc;
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// A structure representing a connected Unix socket.
+///
+/// This socket can be connected directly with `UnixStream::connect` or accepted
+/// from a listener with `UnixListener::incoming`. Additionally, a pair of
+/// anonymous Unix sockets can be created with `UnixStream::pair`.
+pub struct UnixStream {
+ io: PollEvented<mio_uds::UnixStream>,
+}
+
+/// Future returned by `UnixStream::connect` which will resolve to a
+/// `UnixStream` when the stream is connected.
+#[derive(Debug)]
+pub struct ConnectFuture {
+ inner: State,
+}
+
+#[derive(Debug)]
+enum State {
+ Waiting(UnixStream),
+ Error(io::Error),
+ Empty,
+}
+
+impl UnixStream {
+ /// Connects to the socket named by `path`.
+ ///
+ /// This function will create a new Unix socket and connect to the path
+ /// specified, associating the returned stream with the default event loop's
+ /// handle.
+ pub fn connect<P>(path: P) -> ConnectFuture
+ where
+ P: AsRef<Path>,
+ {
+ let res = mio_uds::UnixStream::connect(path)
+ .map(UnixStream::new);
+
+ let inner = match res {
+ Ok(stream) => State::Waiting(stream),
+ Err(e) => State::Error(e),
+ };
+
+ ConnectFuture { inner }
+ }
+
+ /// Consumes a `UnixStream` in the standard library and returns a
+ /// nonblocking `UnixStream` from this crate.
+ ///
+ /// The returned stream will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
+ let stream = mio_uds::UnixStream::from_stream(stream)?;
+ let io = PollEvented::new_with_handle(stream, handle)?;
+
+ Ok(UnixStream { io })
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another. Each socket will
+ /// be associated with the default event loop's handle.
+ pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
+ let (a, b) = try!(mio_uds::UnixStream::pair());
+ let a = UnixStream::new(a);
+ let b = UnixStream::new(b);
+
+ Ok((a, b))
+ }
+
+ pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
+ let io = PollEvented::new(stream);
+ UnixStream { io }
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Test whether this socket is ready to be written to or not.
+ pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
+ self.io.poll_write_ready()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().peer_addr()
+ }
+
+ /// Returns effective credentials of the process which called `connect` or `pair`.
+ pub fn peer_cred(&self) -> io::Result<UCred> {
+ ucred::get_peer_cred(self)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Shuts down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.get_ref().shutdown(how)
+ }
+}
+
+impl Read for UnixStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.read(buf)
+ }
+}
+
+impl Write for UnixStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.write(buf)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.io.flush()
+ }
+}
+
+impl AsyncRead for UnixStream {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ <&UnixStream>::read_buf(&mut &*self, buf)
+ }
+}
+
+impl AsyncWrite for UnixStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ <&UnixStream>::shutdown(&mut &*self)
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ <&UnixStream>::write_buf(&mut &*self, buf)
+ }
+}
+
+impl<'a> Read for &'a UnixStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (&self.io).read(buf)
+ }
+}
+
+impl<'a> Write for &'a UnixStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (&self.io).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.io).flush()
+ }
+}
+
+impl<'a> AsyncRead for &'a UnixStream {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
+ return Ok(Async::NotReady);
+ }
+ unsafe {
+ let r = read_ready(buf, self.as_raw_fd());
+ if r == -1 {
+ let e = io::Error::last_os_error();
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ } else {
+ Err(e)
+ }
+ } else {
+ let r = r as usize;
+ buf.advance_mut(r);
+ Ok(r.into())
+ }
+ }
+ }
+}
+
+impl<'a> AsyncWrite for &'a UnixStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
+ return Ok(Async::NotReady);
+ }
+ unsafe {
+ let r = write_ready(buf, self.as_raw_fd());
+ if r == -1 {
+ let e = io::Error::last_os_error();
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ } else {
+ Err(e)
+ }
+ } else {
+ let r = r as usize;
+ buf.advance(r);
+ Ok(r.into())
+ }
+ }
+ }
+}
+
+impl fmt::Debug for UnixStream {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
+
+impl Future for ConnectFuture {
+ type Item = UnixStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<UnixStream, io::Error> {
+ use std::mem;
+
+ match self.inner {
+ State::Waiting(ref mut stream) => {
+ if let Async::NotReady = stream.io.poll_write_ready()? {
+ return Ok(Async::NotReady)
+ }
+
+ if let Some(e) = try!(stream.io.get_ref().take_error()) {
+ return Err(e)
+ }
+ }
+ State::Error(_) => {
+ let e = match mem::replace(&mut self.inner, State::Empty) {
+ State::Error(e) => e,
+ _ => unreachable!(),
+ };
+
+ return Err(e)
+ },
+ State::Empty => panic!("can't poll stream twice"),
+ }
+
+ match mem::replace(&mut self.inner, State::Empty) {
+ State::Waiting(stream) => Ok(Async::Ready(stream)),
+ _ => unreachable!(),
+ }
+ }
+}
+
+unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
+ // The `IoVec` type can't have a 0-length size, so we create a bunch
+ // of dummy versions on the stack with 1 length which we'll quickly
+ // overwrite.
+ let b1: &mut [u8] = &mut [0];
+ let b2: &mut [u8] = &mut [0];
+ let b3: &mut [u8] = &mut [0];
+ let b4: &mut [u8] = &mut [0];
+ let b5: &mut [u8] = &mut [0];
+ let b6: &mut [u8] = &mut [0];
+ let b7: &mut [u8] = &mut [0];
+ let b8: &mut [u8] = &mut [0];
+ let b9: &mut [u8] = &mut [0];
+ let b10: &mut [u8] = &mut [0];
+ let b11: &mut [u8] = &mut [0];
+ let b12: &mut [u8] = &mut [0];
+ let b13: &mut [u8] = &mut [0];
+ let b14: &mut [u8] = &mut [0];
+ let b15: &mut [u8] = &mut [0];
+ let b16: &mut [u8] = &mut [0];
+ let mut bufs: [&mut IoVec; 16] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ b6.into(),
+ b7.into(),
+ b8.into(),
+ b9.into(),
+ b10.into(),
+ b11.into(),
+ b12.into(),
+ b13.into(),
+ b14.into(),
+ b15.into(),
+ b16.into(),
+ ];
+
+ let n = buf.bytes_vec_mut(&mut bufs);
+ read_ready_vecs(&mut bufs[..n], raw_fd)
+}
+
+unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
+ let iovecs = iovec::unix::as_os_slice_mut(bufs);
+
+ libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
+}
+
+unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
+ // The `IoVec` type can't have a zero-length size, so create a dummy
+ // version from a 1-length slice which we'll overwrite with the
+ // `bytes_vec` method.
+ static DUMMY: &[u8] = &[0];
+ let iovec = <&IoVec>::from(DUMMY);
+ let mut bufs = [
+ iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
+ iovec, iovec, iovec,
+ ];
+
+ let n = buf.bytes_vec(&mut bufs);
+ write_ready_vecs(&bufs[..n], raw_fd)
+}
+
+unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
+ let iovecs = iovec::unix::as_os_slice(bufs);
+
+ libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
+}
diff --git a/third_party/rust/tokio-uds/src/ucred.rs b/third_party/rust/tokio-uds/src/ucred.rs
new file mode 100644
index 0000000000..bc53ea1717
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/ucred.rs
@@ -0,0 +1,159 @@
+use libc::{gid_t, uid_t};
+
+/// Credentials of a process
+#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
+pub struct UCred {
+ /// UID (user ID) of the process
+ pub uid: uid_t,
+ /// GID (group ID) of the process
+ pub gid: gid_t,
+}
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub use self::impl_linux::get_peer_cred;
+
+#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
+pub use self::impl_macos::get_peer_cred;
+
+#[cfg(any(target_os = "solaris"))]
+pub use self::impl_solaris::get_peer_cred;
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub mod impl_linux {
+ use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED};
+ use std::{io, mem};
+ use UnixStream;
+ use std::os::unix::io::AsRawFd;
+
+ use libc::ucred;
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut ucred = ucred {
+ pid: 0,
+ uid: 0,
+ gid: 0,
+ };
+
+ let ucred_size = mem::size_of::<ucred>();
+
+ // These paranoid checks should be optimized-out
+ assert!(mem::size_of::<u32>() <= mem::size_of::<usize>());
+ assert!(ucred_size <= u32::max_value() as usize);
+
+ let mut ucred_size = ucred_size as socklen_t;
+
+ let ret = getsockopt(
+ raw_fd,
+ SOL_SOCKET,
+ SO_PEERCRED,
+ &mut ucred as *mut ucred as *mut c_void,
+ &mut ucred_size,
+ );
+ if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() {
+ Ok(super::UCred {
+ uid: ucred.uid,
+ gid: ucred.gid,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
+pub mod impl_macos {
+ use libc::getpeereid;
+ use std::{io, mem};
+ use UnixStream;
+ use std::os::unix::io::AsRawFd;
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut cred: super::UCred = mem::uninitialized();
+
+ let ret = getpeereid(raw_fd, &mut cred.uid, &mut cred.gid);
+
+ if ret == 0 {
+ Ok(cred)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+
+#[cfg(any(target_os = "solaris"))]
+pub mod impl_solaris {
+ use std::io;
+ use std::os::unix::io::AsRawFd;
+ use UnixStream;
+ use std::ptr;
+
+ #[allow(non_camel_case_types)]
+ enum ucred_t {}
+
+ extern "C" {
+ fn ucred_free(cred: *mut ucred_t);
+ fn ucred_geteuid(cred: *const ucred_t) -> super::uid_t;
+ fn ucred_getegid(cred: *const ucred_t) -> super::gid_t;
+
+ fn getpeerucred(fd: ::std::os::raw::c_int, cred: *mut *mut ucred_t) -> ::std::os::raw::c_int;
+ }
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut cred = ptr::null_mut::<*mut ucred_t>() as *mut ucred_t;
+
+ let ret = getpeerucred(raw_fd, &mut cred);
+
+ if ret == 0 {
+ let uid = ucred_geteuid(cred);
+ let gid = ucred_getegid(cred);
+
+ ucred_free(cred);
+
+ Ok(super::UCred {
+ uid,
+ gid,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+
+// Note that LOCAL_PEERCRED is not supported on DragonFly (yet). So do not run tests.
+#[cfg(not(target_os = "dragonfly"))]
+#[cfg(test)]
+mod test {
+ use UnixStream;
+ use libc::geteuid;
+ use libc::getegid;
+
+ #[test]
+ #[cfg_attr(target_os = "freebsd", ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419")]
+ #[cfg_attr(target_os = "netbsd", ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()")]
+ fn test_socket_pair() {
+ let (a, b) = UnixStream::pair().unwrap();
+ let cred_a = a.peer_cred().unwrap();
+ let cred_b = b.peer_cred().unwrap();
+ assert_eq!(cred_a, cred_b);
+
+ let uid = unsafe { geteuid() };
+ let gid = unsafe { getegid() };
+
+ assert_eq!(cred_a.uid, uid);
+ assert_eq!(cred_a.gid, gid);
+ }
+}