summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-uds
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-uds
parentInitial commit. (diff)
downloadfirefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz
firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-uds')
-rw-r--r--third_party/rust/tokio-uds/.cargo-checksum.json1
-rw-r--r--third_party/rust/tokio-uds/CHANGELOG.md28
-rw-r--r--third_party/rust/tokio-uds/Cargo.toml56
-rw-r--r--third_party/rust/tokio-uds/LICENSE25
-rw-r--r--third_party/rust/tokio-uds/README.md15
-rw-r--r--third_party/rust/tokio-uds/src/datagram.rs209
-rw-r--r--third_party/rust/tokio-uds/src/frame.rs160
-rw-r--r--third_party/rust/tokio-uds/src/incoming.rs27
-rw-r--r--third_party/rust/tokio-uds/src/lib.rs38
-rw-r--r--third_party/rust/tokio-uds/src/listener.rs146
-rw-r--r--third_party/rust/tokio-uds/src/recv_dgram.rs82
-rw-r--r--third_party/rust/tokio-uds/src/send_dgram.rs81
-rw-r--r--third_party/rust/tokio-uds/src/stream.rs356
-rw-r--r--third_party/rust/tokio-uds/src/ucred.rs159
-rw-r--r--third_party/rust/tokio-uds/tests/datagram.rs83
-rw-r--r--third_party/rust/tokio-uds/tests/stream.rs55
16 files changed, 1521 insertions, 0 deletions
diff --git a/third_party/rust/tokio-uds/.cargo-checksum.json b/third_party/rust/tokio-uds/.cargo-checksum.json
new file mode 100644
index 0000000000..576acba475
--- /dev/null
+++ b/third_party/rust/tokio-uds/.cargo-checksum.json
@@ -0,0 +1 @@
+{"files":{"CHANGELOG.md":"8c6ee54bae6536cfa83cb02c200dbb935c3003eb12c6cab4b3d24a1cd9372efb","Cargo.toml":"6c9880a09d65a7bf1265ff6a3f3bc69bab9a83b520878e1bda67f5216f03581d","LICENSE":"898b1ae9821e98daf8964c8d6c7f61641f5f5aa78ad500020771c0939ee0dea1","README.md":"e4c179d7ce8bb49c57c6c03bfb74cc369028ae5ac26ddf90b9f6e86d5d02af0d","src/datagram.rs":"967a6bea9903190efd89165b64c2191716569b34b8d7e548c412cfeefcaf6ec4","src/frame.rs":"0b5b7bde4607a8e0d34b524ee877739aa3ce52234fe5836a9da7d74bc1570a6a","src/incoming.rs":"a8cfa5323427c489e4d482131b3d710e9b115ab5f028c2bbcec5aa39a4c23ff0","src/lib.rs":"f94d84d16173db18e769dd3ff3e60583971dd57590ad7ef8d3e11f8222873dad","src/listener.rs":"26ebf259c13bc878baf7cb92e0b9bd9a9f0b746a6152581d23bb9d84cba3e039","src/recv_dgram.rs":"38593020f3bf9b65853062096ef90ef6338753782806f2b173a48d0b6d445a55","src/send_dgram.rs":"dba06c866150df9e4d38ce40e97eec1c87778f86a178cdf653d27d48fdd57134","src/stream.rs":"8a7dc87891e8f7e66121e62ff4b4e1840edb8163fab07cafabf6cc94385ce7da","src/ucred.rs":"2d4ba4a488ebc43333a5a89dbf1bf614d157e916e97b0364c06e3056d5865913","tests/datagram.rs":"84390395e3f4e5428d636616c3137ab649daa83b776fee215b2e92cf6905ad67","tests/stream.rs":"c6d975cfa7368fde8b1f59fe8ca9987307e094d18b18cf856f5b4066439aebca"},"package":"037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445"} \ No newline at end of file
diff --git a/third_party/rust/tokio-uds/CHANGELOG.md b/third_party/rust/tokio-uds/CHANGELOG.md
new file mode 100644
index 0000000000..fcf798cad0
--- /dev/null
+++ b/third_party/rust/tokio-uds/CHANGELOG.md
@@ -0,0 +1,28 @@
+# 0.2.5 (January 6, 2019)
+
+* Fix bug in `UnixDatagram::send` (#782).
+
+# 0.2.4 (November 24, 2018)
+
+* Implement `UnixDatagramFramed`, providing a `Stream + Sink` layer for
+ unix domain sockets (#453).
+* Add solaris support for `ucred` (#733).
+* Documentation tweaks (#754).
+
+# 0.2.3 (October 23, 2018)
+
+* Fix build on NetBSD (#715).
+
+# 0.2.2 (September 27, 2018)
+
+* Fix bug in `UdsStream::read_buf` (#672).
+
+# 0.2.1 (August 19, 2018)
+
+* Re-export `ConnectFuture` (#430).
+* bug: Fix `recv_from` (#452).
+* bug: Fix build on FreeBSD.
+
+# 0.2.0 (June 6, 2018)
+
+* Initial 0.2 release.
diff --git a/third_party/rust/tokio-uds/Cargo.toml b/third_party/rust/tokio-uds/Cargo.toml
new file mode 100644
index 0000000000..e950fe54d4
--- /dev/null
+++ b/third_party/rust/tokio-uds/Cargo.toml
@@ -0,0 +1,56 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g. crates.io) dependencies
+#
+# If you believe there's an error in this file please file an
+# issue against the rust-lang/cargo repository. If you're
+# editing this file be aware that the upstream Cargo.toml
+# will likely look very different (and much more reasonable)
+
+[package]
+name = "tokio-uds"
+version = "0.2.5"
+authors = ["Carl Lerche <me@carllerche.com>"]
+description = "Unix Domain sockets for Tokio\n"
+homepage = "https://github.com/tokio-rs/tokio"
+documentation = "https://docs.rs/tokio-uds/0.2.5/tokio_uds/"
+categories = ["asynchronous"]
+license = "MIT"
+repository = "https://github.com/tokio-rs/tokio"
+[dependencies.bytes]
+version = "0.4.8"
+
+[dependencies.futures]
+version = "0.1.21"
+
+[dependencies.iovec]
+version = "0.1.2"
+
+[dependencies.libc]
+version = "0.2.42"
+
+[dependencies.log]
+version = "0.4.2"
+
+[dependencies.mio]
+version = "0.6.14"
+
+[dependencies.mio-uds]
+version = "0.6.5"
+
+[dependencies.tokio-codec]
+version = "0.1.0"
+
+[dependencies.tokio-io]
+version = "0.1.6"
+
+[dependencies.tokio-reactor]
+version = "0.1.1"
+[dev-dependencies.tempfile]
+version = "3"
+
+[dev-dependencies.tokio]
+version = "0.1.6"
diff --git a/third_party/rust/tokio-uds/LICENSE b/third_party/rust/tokio-uds/LICENSE
new file mode 100644
index 0000000000..cdb28b4b56
--- /dev/null
+++ b/third_party/rust/tokio-uds/LICENSE
@@ -0,0 +1,25 @@
+Copyright (c) 2019 Tokio Contributors
+
+Permission is hereby granted, free of charge, to any
+person obtaining a copy of this software and associated
+documentation files (the "Software"), to deal in the
+Software without restriction, including without
+limitation the rights to use, copy, modify, merge,
+publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software
+is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice
+shall be included in all copies or substantial portions
+of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
+ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
+TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
+IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/third_party/rust/tokio-uds/README.md b/third_party/rust/tokio-uds/README.md
new file mode 100644
index 0000000000..1c6a80c04e
--- /dev/null
+++ b/third_party/rust/tokio-uds/README.md
@@ -0,0 +1,15 @@
+# tokio-uds
+
+An implementation of Unix Domain Sockets for Tokio
+
+[Documentation](https://docs.rs/tokio-uds/0.2.5/tokio_uds/)
+
+## License
+
+This project is licensed under the [MIT license](./LICENSE).
+
+### Contribution
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in Tokio by you, shall be licensed as MIT, without any additional
+terms or conditions.
diff --git a/third_party/rust/tokio-uds/src/datagram.rs b/third_party/rust/tokio-uds/src/datagram.rs
new file mode 100644
index 0000000000..da3f751a9f
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/datagram.rs
@@ -0,0 +1,209 @@
+use {RecvDgram, SendDgram};
+
+use tokio_reactor::{Handle, PollEvented};
+
+use futures::{Async, Poll};
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io;
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// An I/O object representing a Unix datagram socket.
+pub struct UnixDatagram {
+ io: PollEvented<mio_uds::UnixDatagram>,
+}
+
+impl UnixDatagram {
+ /// Creates a new `UnixDatagram` bound to the specified path.
+ pub fn bind<P>(path: P) -> io::Result<UnixDatagram>
+ where
+ P: AsRef<Path>,
+ {
+ let socket = mio_uds::UnixDatagram::bind(path)?;
+ Ok(UnixDatagram::new(socket))
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another. Each socket will
+ /// be associated with the default event loop's handle.
+ pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
+ let (a, b) = mio_uds::UnixDatagram::pair()?;
+ let a = UnixDatagram::new(a);
+ let b = UnixDatagram::new(b);
+
+ Ok((a, b))
+ }
+
+ /// Consumes a `UnixDatagram` in the standard library and returns a
+ /// nonblocking `UnixDatagram` from this crate.
+ ///
+ /// The returned datagram will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result<UnixDatagram> {
+ let socket = mio_uds::UnixDatagram::from_datagram(datagram)?;
+ let io = PollEvented::new_with_handle(socket, handle)?;
+ Ok(UnixDatagram { io })
+ }
+
+ fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
+ let io = PollEvented::new(socket);
+ UnixDatagram { io }
+ }
+
+ /// Creates a new `UnixDatagram` which is not bound to any address.
+ pub fn unbound() -> io::Result<UnixDatagram> {
+ let socket = mio_uds::UnixDatagram::unbound()?;
+ Ok(UnixDatagram::new(socket))
+ }
+
+ /// Connects the socket to the specified address.
+ ///
+ /// The `send` method may be used to send data to the specified address.
+ /// `recv` and `recv_from` will only receive data from that address.
+ pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> {
+ self.io.get_ref().connect(path)
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Test whether this socket is ready to be written to or not.
+ pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
+ self.io.poll_write_ready()
+ }
+
+ /// Returns the local address that this socket is bound to.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Returns the address of this socket's peer.
+ ///
+ /// The `connect` method will connect the socket to a peer.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().peer_addr()
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// On success, returns the number of bytes read and the address from
+ /// whence the data came.
+ pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().recv_from(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Receives data from the socket.
+ ///
+ /// On success, returns the number of bytes read.
+ pub fn poll_recv(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().recv(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details.
+ pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T>
+ where
+ T: AsMut<[u8]>,
+ {
+ RecvDgram::new(self, buf)
+ }
+
+ /// Sends data on the socket to the specified address.
+ ///
+ /// On success, returns the number of bytes written.
+ pub fn poll_send_to<P>(&self, buf: &[u8], path: P) -> Poll<usize, io::Error>
+ where
+ P: AsRef<Path>,
+ {
+ try_ready!(self.io.poll_write_ready());
+
+ match self.io.get_ref().send_to(buf, path) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Sends data on the socket to the socket's peer.
+ ///
+ /// The peer address may be set by the `connect` method, and this method
+ /// will return an error if the socket has not already been connected.
+ ///
+ /// On success, returns the number of bytes written.
+ pub fn poll_send(&self, buf: &[u8]) -> Poll<usize, io::Error> {
+ try_ready!(self.io.poll_write_ready());
+
+ match self.io.get_ref().send(buf) {
+ Ok(ret) => Ok(ret.into()),
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ }
+ Err(e) => Err(e),
+ }
+ }
+
+ /// Returns a future sending the data in buf to the socket at path.
+ pub fn send_dgram<T, P>(self, buf: T, path: P) -> SendDgram<T, P>
+ where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+ {
+ SendDgram::new(self, buf, path)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Shut down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.get_ref().shutdown(how)
+ }
+}
+
+impl fmt::Debug for UnixDatagram {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixDatagram {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/frame.rs b/third_party/rust/tokio-uds/src/frame.rs
new file mode 100644
index 0000000000..49c8948c64
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/frame.rs
@@ -0,0 +1,160 @@
+use std::io;
+use std::os::unix::net::SocketAddr;
+use std::path::Path;
+
+use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
+
+use super::UnixDatagram;
+
+use tokio_codec::{Decoder, Encoder};
+use bytes::{BytesMut, BufMut};
+
+/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using
+/// the `Encoder` and `Decoder` traits to encode and decode frames.
+///
+/// Unix datagram sockets work with datagrams, but higher-level code may wants to
+/// batch these into meaningful chunks, called "frames". This method layers
+/// framing on top of this socket by using the `Encoder` and `Decoder` traits to
+/// handle encoding and decoding of messages frames. Note that the incoming and
+/// outgoing frame types may be distinct.
+///
+/// This function returns a *single* object that is both `Stream` and `Sink`;
+/// grouping this into a single object is often useful for layering things which
+/// require both read and write access to the underlying object.
+///
+/// If you want to work more directly with the streams and sink, consider
+/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break
+/// them into separate objects, allowing them to interact more easily.
+#[must_use = "sinks do nothing unless polled"]
+#[derive(Debug)]
+pub struct UnixDatagramFramed<A, C> {
+ socket: UnixDatagram,
+ codec: C,
+ rd: BytesMut,
+ wr: BytesMut,
+ out_addr: Option<A>,
+ flushed: bool,
+}
+
+impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> {
+ type Item = (C::Item, SocketAddr);
+ type Error = C::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ self.rd.reserve(INITIAL_RD_CAPACITY);
+
+ let (n, addr) = unsafe {
+ let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut()));
+ self.rd.advance_mut(n);
+ (n, addr)
+ };
+ trace!("received {} bytes, decoding", n);
+ let frame_res = self.codec.decode(&mut self.rd);
+ self.rd.clear();
+ let frame = frame_res?;
+ let result = frame.map(|frame| (frame, addr));
+ trace!("frame decoded from buffer");
+ Ok(Async::Ready(result))
+ }
+}
+
+impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> {
+ type SinkItem = (C::Item, A);
+ type SinkError = C::Error;
+
+ fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
+ trace!("sending frame");
+
+ if !self.flushed {
+ match try!(self.poll_complete()) {
+ Async::Ready(()) => {},
+ Async::NotReady => return Ok(AsyncSink::NotReady(item)),
+ }
+ }
+
+ let (frame, out_addr) = item;
+ self.codec.encode(frame, &mut self.wr)?;
+ self.out_addr = Some(out_addr);
+ self.flushed = false;
+ trace!("frame encoded; length={}", self.wr.len());
+
+ Ok(AsyncSink::Ready)
+ }
+
+ fn poll_complete(&mut self) -> Poll<(), C::Error> {
+ if self.flushed {
+ return Ok(Async::Ready(()))
+ }
+
+ let n = {
+ let out_path = match self.out_addr {
+ Some(ref out_path) => out_path.as_ref(),
+ None => return Err(io::Error::new(io::ErrorKind::Other,
+ "internal error: addr not available while data not flushed").into()),
+ };
+
+ trace!("flushing frame; length={}", self.wr.len());
+ try_ready!(self.socket.poll_send_to(&self.wr, out_path))
+ };
+
+ trace!("written {}", n);
+
+ let wrote_all = n == self.wr.len();
+ self.wr.clear();
+ self.flushed = true;
+
+ if wrote_all {
+ self.out_addr = None;
+ Ok(Async::Ready(()))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other,
+ "failed to write entire datagram to socket").into())
+ }
+ }
+
+ fn close(&mut self) -> Poll<(), C::Error> {
+ self.poll_complete()
+ }
+}
+
+const INITIAL_RD_CAPACITY: usize = 64 * 1024;
+const INITIAL_WR_CAPACITY: usize = 8 * 1024;
+
+impl<A, C> UnixDatagramFramed<A, C> {
+ /// Create a new `UnixDatagramFramed` backed by the given socket and codec.
+ ///
+ /// See struct level documentation for more details.
+ pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> {
+ UnixDatagramFramed {
+ socket: socket,
+ codec: codec,
+ out_addr: None,
+ rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY),
+ wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY),
+ flushed: true,
+ }
+ }
+
+ /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
+ ///
+ /// # Note
+ ///
+ /// Care should be taken to not tamper with the underlying stream of data
+ /// coming in as it may corrupt the stream of frames otherwise being worked
+ /// with.
+ pub fn get_ref(&self) -> &UnixDatagram {
+ &self.socket
+ }
+
+ /// Returns a mutable reference to the underlying I/O stream wrapped by
+ /// `Framed`.
+ ///
+ /// # Note
+ ///
+ /// Care should be taken to not tamper with the underlying stream of data
+ /// coming in as it may corrupt the stream of frames otherwise being worked
+ /// with.
+ pub fn get_mut(&mut self) -> &mut UnixDatagram {
+ &mut self.socket
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/incoming.rs b/third_party/rust/tokio-uds/src/incoming.rs
new file mode 100644
index 0000000000..28d4d76837
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/incoming.rs
@@ -0,0 +1,27 @@
+use {UnixListener, UnixStream};
+
+use futures::{Stream, Poll};
+
+use std::io;
+
+/// Stream of listeners
+#[derive(Debug)]
+pub struct Incoming {
+ inner: UnixListener,
+}
+
+impl Incoming {
+ pub(crate) fn new(listener: UnixListener) -> Incoming {
+ Incoming { inner: listener }
+ }
+}
+
+impl Stream for Incoming {
+ type Item = UnixStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
+ Ok(Some(try_ready!(self.inner.poll_accept()).0).into())
+ }
+}
+
diff --git a/third_party/rust/tokio-uds/src/lib.rs b/third_party/rust/tokio-uds/src/lib.rs
new file mode 100644
index 0000000000..4c02185a8e
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/lib.rs
@@ -0,0 +1,38 @@
+#![cfg(unix)]
+#![doc(html_root_url = "https://docs.rs/tokio-uds/0.2.5")]
+#![deny(missing_docs, warnings, missing_debug_implementations)]
+
+//! Unix Domain Sockets for Tokio.
+//!
+//! This crate provides APIs for using Unix Domain Sockets with Tokio.
+
+extern crate bytes;
+#[macro_use]
+extern crate futures;
+extern crate iovec;
+extern crate libc;
+#[macro_use]
+extern crate log;
+extern crate mio;
+extern crate mio_uds;
+extern crate tokio_codec;
+extern crate tokio_io;
+extern crate tokio_reactor;
+
+mod datagram;
+mod frame;
+mod incoming;
+mod listener;
+mod recv_dgram;
+mod send_dgram;
+mod stream;
+mod ucred;
+
+pub use datagram::UnixDatagram;
+pub use frame::UnixDatagramFramed;
+pub use incoming::Incoming;
+pub use listener::UnixListener;
+pub use recv_dgram::RecvDgram;
+pub use send_dgram::SendDgram;
+pub use stream::{UnixStream, ConnectFuture};
+pub use ucred::UCred;
diff --git a/third_party/rust/tokio-uds/src/listener.rs b/third_party/rust/tokio-uds/src/listener.rs
new file mode 100644
index 0000000000..c63b4a8432
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/listener.rs
@@ -0,0 +1,146 @@
+use {Incoming, UnixStream};
+
+use tokio_reactor::{Handle, PollEvented};
+
+use futures::{Async, Poll};
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// A Unix socket which can accept connections from other Unix sockets.
+pub struct UnixListener {
+ io: PollEvented<mio_uds::UnixListener>,
+}
+
+impl UnixListener {
+ /// Creates a new `UnixListener` bound to the specified path.
+ pub fn bind<P>(path: P) -> io::Result<UnixListener>
+ where
+ P: AsRef<Path>,
+ {
+ let listener = mio_uds::UnixListener::bind(path)?;
+ let io = PollEvented::new(listener);
+ Ok(UnixListener { io })
+ }
+
+ /// Consumes a `UnixListener` in the standard library and returns a
+ /// nonblocking `UnixListener` from this crate.
+ ///
+ /// The returned listener will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result<UnixListener> {
+ let listener = mio_uds::UnixListener::from_listener(listener)?;
+ let io = PollEvented::new_with_handle(listener, handle)?;
+ Ok(UnixListener { io })
+ }
+
+ /// Returns the local socket address of this listener.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Attempt to accept a connection and create a new connected `UnixStream`
+ /// if successful.
+ ///
+ /// This function will attempt an accept operation, but will not block
+ /// waiting for it to complete. If the operation would block then a "would
+ /// block" error is returned. Additionally, if this method would block, it
+ /// registers the current task to receive a notification when it would
+ /// otherwise not block.
+ ///
+ /// Note that typically for simple usage it's easier to treat incoming
+ /// connections as a `Stream` of `UnixStream`s with the `incoming` method
+ /// below.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if it is called outside the context of a
+ /// future's task. It's recommended to only call this from the
+ /// implementation of a `Future::poll`, if necessary.
+ pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> {
+ let (io, addr) = try_ready!(self.poll_accept_std());
+
+ let io = mio_uds::UnixStream::from_stream(io)?;
+ Ok((UnixStream::new(io), addr).into())
+ }
+
+ /// Attempt to accept a connection and create a new connected `UnixStream`
+ /// if successful.
+ ///
+ /// This function is the same as `poll_accept` above except that it returns a
+ /// `mio_uds::UnixStream` instead of a `tokio_udp::UnixStream`. This in turn
+ /// can then allow for the stream to be associated with a different reactor
+ /// than the one this `UnixListener` is associated with.
+ ///
+ /// This function will attempt an accept operation, but will not block
+ /// waiting for it to complete. If the operation would block then a "would
+ /// block" error is returned. Additionally, if this method would block, it
+ /// registers the current task to receive a notification when it would
+ /// otherwise not block.
+ ///
+ /// Note that typically for simple usage it's easier to treat incoming
+ /// connections as a `Stream` of `UnixStream`s with the `incoming` method
+ /// below.
+ ///
+ /// # Panics
+ ///
+ /// This function will panic if it is called outside the context of a
+ /// future's task. It's recommended to only call this from the
+ /// implementation of a `Future::poll`, if necessary.
+ pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> {
+ loop {
+ try_ready!(self.io.poll_read_ready(Ready::readable()));
+
+ match self.io.get_ref().accept_std() {
+ Ok(None) => {
+ self.io.clear_read_ready(Ready::readable())?;
+ return Ok(Async::NotReady);
+ }
+ Ok(Some((sock, addr))) => {
+ return Ok(Async::Ready((sock, addr)));
+ }
+ Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
+ self.io.clear_read_ready(Ready::readable())?;
+ return Ok(Async::NotReady);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+ }
+
+ /// Consumes this listener, returning a stream of the sockets this listener
+ /// accepts.
+ ///
+ /// This method returns an implementation of the `Stream` trait which
+ /// resolves to the sockets the are accepted on this listener.
+ pub fn incoming(self) -> Incoming {
+ Incoming::new(self)
+ }
+}
+
+impl fmt::Debug for UnixListener {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixListener {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/recv_dgram.rs b/third_party/rust/tokio-uds/src/recv_dgram.rs
new file mode 100644
index 0000000000..390202f38b
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/recv_dgram.rs
@@ -0,0 +1,82 @@
+use UnixDatagram;
+
+use futures::{Async, Future, Poll};
+
+use std::io;
+use std::mem;
+
+/// A future for receiving datagrams from a Unix datagram socket.
+///
+/// An example that uses UDP sockets but is still applicable can be found at
+/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5.
+#[derive(Debug)]
+pub struct RecvDgram<T> {
+ st: State<T>,
+}
+
+/// A future similar to RecvDgram, but without allocating and returning the peer's address.
+///
+/// This can be used if the peer's address is of no interest, so the allocation overhead can be
+/// avoided.
+#[derive(Debug)]
+enum State<T> {
+ Receiving {
+ sock: UnixDatagram,
+ buf: T,
+ },
+ Empty,
+}
+
+impl<T> RecvDgram<T>
+where
+ T: AsMut<[u8]>
+{
+ pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram<T> {
+ RecvDgram {
+ st: State::Receiving {
+ sock,
+ buf,
+ },
+ }
+ }
+}
+
+impl<T> Future for RecvDgram<T>
+where
+ T: AsMut<[u8]>,
+{
+ /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were
+ /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the
+ /// datagram is truncated.
+ type Item = (UnixDatagram, T, usize, String);
+ /// This future yields io::Error if an error occurred.
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ let received;
+ let peer;
+
+ if let State::Receiving {
+ ref mut sock,
+ ref mut buf,
+ } = self.st
+ {
+ let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut()));
+ received = n;
+
+ peer = p.as_pathname().map_or(String::new(), |p| {
+ p.to_str().map_or(String::new(), |s| s.to_string())
+ });
+ } else {
+ panic!()
+ }
+
+ if let State::Receiving { sock, buf } =
+ mem::replace(&mut self.st, State::Empty)
+ {
+ Ok(Async::Ready((sock, buf, received, peer)))
+ } else {
+ panic!()
+ }
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/send_dgram.rs b/third_party/rust/tokio-uds/src/send_dgram.rs
new file mode 100644
index 0000000000..59d438b761
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/send_dgram.rs
@@ -0,0 +1,81 @@
+use UnixDatagram;
+
+use futures::{Async, Future, Poll};
+
+use std::io;
+use std::mem;
+use std::path::Path;
+
+/// A future for writing a buffer to a Unix datagram socket.
+#[derive(Debug)]
+pub struct SendDgram<T, P> {
+ st: State<T, P>,
+}
+
+#[derive(Debug)]
+enum State<T, P> {
+ /// current state is Sending
+ Sending {
+ /// the underlying socket
+ sock: UnixDatagram,
+ /// the buffer to send
+ buf: T,
+ /// the destination
+ addr: P,
+ },
+ /// neutral state
+ Empty,
+}
+
+impl<T, P> SendDgram<T, P>
+where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+{
+ pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram<T, P> {
+ SendDgram {
+ st: State::Sending {
+ sock,
+ buf,
+ addr,
+ }
+ }
+ }
+}
+
+impl<T, P> Future for SendDgram<T, P>
+where
+ T: AsRef<[u8]>,
+ P: AsRef<Path>,
+{
+ /// Returns the underlying socket and the buffer that was sent.
+ type Item = (UnixDatagram, T);
+ /// The error that is returned when sending failed.
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if let State::Sending {
+ ref mut sock,
+ ref buf,
+ ref addr,
+ } = self.st
+ {
+ let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr));
+ if n < buf.as_ref().len() {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "Couldn't send whole buffer".to_string(),
+ ));
+ }
+ } else {
+ panic!()
+ }
+ if let State::Sending { sock, buf, addr: _ } =
+ mem::replace(&mut self.st, State::Empty)
+ {
+ Ok(Async::Ready((sock, buf)))
+ } else {
+ panic!()
+ }
+ }
+}
diff --git a/third_party/rust/tokio-uds/src/stream.rs b/third_party/rust/tokio-uds/src/stream.rs
new file mode 100644
index 0000000000..7098c85fe1
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/stream.rs
@@ -0,0 +1,356 @@
+use ucred::{self, UCred};
+
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_reactor::{Handle, PollEvented};
+
+use bytes::{Buf, BufMut};
+use futures::{Async, Future, Poll};
+use iovec::{self, IoVec};
+use libc;
+use mio::Ready;
+use mio_uds;
+
+use std::fmt;
+use std::io::{self, Read, Write};
+use std::net::Shutdown;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{self, SocketAddr};
+use std::path::Path;
+
+/// A structure representing a connected Unix socket.
+///
+/// This socket can be connected directly with `UnixStream::connect` or accepted
+/// from a listener with `UnixListener::incoming`. Additionally, a pair of
+/// anonymous Unix sockets can be created with `UnixStream::pair`.
+pub struct UnixStream {
+ io: PollEvented<mio_uds::UnixStream>,
+}
+
+/// Future returned by `UnixStream::connect` which will resolve to a
+/// `UnixStream` when the stream is connected.
+#[derive(Debug)]
+pub struct ConnectFuture {
+ inner: State,
+}
+
+#[derive(Debug)]
+enum State {
+ Waiting(UnixStream),
+ Error(io::Error),
+ Empty,
+}
+
+impl UnixStream {
+ /// Connects to the socket named by `path`.
+ ///
+ /// This function will create a new Unix socket and connect to the path
+ /// specified, associating the returned stream with the default event loop's
+ /// handle.
+ pub fn connect<P>(path: P) -> ConnectFuture
+ where
+ P: AsRef<Path>,
+ {
+ let res = mio_uds::UnixStream::connect(path)
+ .map(UnixStream::new);
+
+ let inner = match res {
+ Ok(stream) => State::Waiting(stream),
+ Err(e) => State::Error(e),
+ };
+
+ ConnectFuture { inner }
+ }
+
+ /// Consumes a `UnixStream` in the standard library and returns a
+ /// nonblocking `UnixStream` from this crate.
+ ///
+ /// The returned stream will be associated with the given event loop
+ /// specified by `handle` and is ready to perform I/O.
+ pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
+ let stream = mio_uds::UnixStream::from_stream(stream)?;
+ let io = PollEvented::new_with_handle(stream, handle)?;
+
+ Ok(UnixStream { io })
+ }
+
+ /// Creates an unnamed pair of connected sockets.
+ ///
+ /// This function will create a pair of interconnected Unix sockets for
+ /// communicating back and forth between one another. Each socket will
+ /// be associated with the default event loop's handle.
+ pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
+ let (a, b) = try!(mio_uds::UnixStream::pair());
+ let a = UnixStream::new(a);
+ let b = UnixStream::new(b);
+
+ Ok((a, b))
+ }
+
+ pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
+ let io = PollEvented::new(stream);
+ UnixStream { io }
+ }
+
+ /// Test whether this socket is ready to be read or not.
+ pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> {
+ self.io.poll_read_ready(ready)
+ }
+
+ /// Test whether this socket is ready to be written to or not.
+ pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> {
+ self.io.poll_write_ready()
+ }
+
+ /// Returns the socket address of the local half of this connection.
+ pub fn local_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().local_addr()
+ }
+
+ /// Returns the socket address of the remote half of this connection.
+ pub fn peer_addr(&self) -> io::Result<SocketAddr> {
+ self.io.get_ref().peer_addr()
+ }
+
+ /// Returns effective credentials of the process which called `connect` or `pair`.
+ pub fn peer_cred(&self) -> io::Result<UCred> {
+ ucred::get_peer_cred(self)
+ }
+
+ /// Returns the value of the `SO_ERROR` option.
+ pub fn take_error(&self) -> io::Result<Option<io::Error>> {
+ self.io.get_ref().take_error()
+ }
+
+ /// Shuts down the read, write, or both halves of this connection.
+ ///
+ /// This function will cause all pending and future I/O calls on the
+ /// specified portions to immediately return with an appropriate value
+ /// (see the documentation of `Shutdown`).
+ pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
+ self.io.get_ref().shutdown(how)
+ }
+}
+
+impl Read for UnixStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.io.read(buf)
+ }
+}
+
+impl Write for UnixStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.io.write(buf)
+ }
+ fn flush(&mut self) -> io::Result<()> {
+ self.io.flush()
+ }
+}
+
+impl AsyncRead for UnixStream {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ <&UnixStream>::read_buf(&mut &*self, buf)
+ }
+}
+
+impl AsyncWrite for UnixStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ <&UnixStream>::shutdown(&mut &*self)
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ <&UnixStream>::write_buf(&mut &*self, buf)
+ }
+}
+
+impl<'a> Read for &'a UnixStream {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ (&self.io).read(buf)
+ }
+}
+
+impl<'a> Write for &'a UnixStream {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ (&self.io).write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.io).flush()
+ }
+}
+
+impl<'a> AsyncRead for &'a UnixStream {
+ unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
+ false
+ }
+
+ fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? {
+ return Ok(Async::NotReady);
+ }
+ unsafe {
+ let r = read_ready(buf, self.as_raw_fd());
+ if r == -1 {
+ let e = io::Error::last_os_error();
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.io.clear_read_ready(Ready::readable())?;
+ Ok(Async::NotReady)
+ } else {
+ Err(e)
+ }
+ } else {
+ let r = r as usize;
+ buf.advance_mut(r);
+ Ok(r.into())
+ }
+ }
+ }
+}
+
+impl<'a> AsyncWrite for &'a UnixStream {
+ fn shutdown(&mut self) -> Poll<(), io::Error> {
+ Ok(().into())
+ }
+
+ fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
+ if let Async::NotReady = <UnixStream>::poll_write_ready(self)? {
+ return Ok(Async::NotReady);
+ }
+ unsafe {
+ let r = write_ready(buf, self.as_raw_fd());
+ if r == -1 {
+ let e = io::Error::last_os_error();
+ if e.kind() == io::ErrorKind::WouldBlock {
+ self.io.clear_write_ready()?;
+ Ok(Async::NotReady)
+ } else {
+ Err(e)
+ }
+ } else {
+ let r = r as usize;
+ buf.advance(r);
+ Ok(r.into())
+ }
+ }
+ }
+}
+
+impl fmt::Debug for UnixStream {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.io.get_ref().fmt(f)
+ }
+}
+
+impl AsRawFd for UnixStream {
+ fn as_raw_fd(&self) -> RawFd {
+ self.io.get_ref().as_raw_fd()
+ }
+}
+
+impl Future for ConnectFuture {
+ type Item = UnixStream;
+ type Error = io::Error;
+
+ fn poll(&mut self) -> Poll<UnixStream, io::Error> {
+ use std::mem;
+
+ match self.inner {
+ State::Waiting(ref mut stream) => {
+ if let Async::NotReady = stream.io.poll_write_ready()? {
+ return Ok(Async::NotReady)
+ }
+
+ if let Some(e) = try!(stream.io.get_ref().take_error()) {
+ return Err(e)
+ }
+ }
+ State::Error(_) => {
+ let e = match mem::replace(&mut self.inner, State::Empty) {
+ State::Error(e) => e,
+ _ => unreachable!(),
+ };
+
+ return Err(e)
+ },
+ State::Empty => panic!("can't poll stream twice"),
+ }
+
+ match mem::replace(&mut self.inner, State::Empty) {
+ State::Waiting(stream) => Ok(Async::Ready(stream)),
+ _ => unreachable!(),
+ }
+ }
+}
+
+unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize {
+ // The `IoVec` type can't have a 0-length size, so we create a bunch
+ // of dummy versions on the stack with 1 length which we'll quickly
+ // overwrite.
+ let b1: &mut [u8] = &mut [0];
+ let b2: &mut [u8] = &mut [0];
+ let b3: &mut [u8] = &mut [0];
+ let b4: &mut [u8] = &mut [0];
+ let b5: &mut [u8] = &mut [0];
+ let b6: &mut [u8] = &mut [0];
+ let b7: &mut [u8] = &mut [0];
+ let b8: &mut [u8] = &mut [0];
+ let b9: &mut [u8] = &mut [0];
+ let b10: &mut [u8] = &mut [0];
+ let b11: &mut [u8] = &mut [0];
+ let b12: &mut [u8] = &mut [0];
+ let b13: &mut [u8] = &mut [0];
+ let b14: &mut [u8] = &mut [0];
+ let b15: &mut [u8] = &mut [0];
+ let b16: &mut [u8] = &mut [0];
+ let mut bufs: [&mut IoVec; 16] = [
+ b1.into(),
+ b2.into(),
+ b3.into(),
+ b4.into(),
+ b5.into(),
+ b6.into(),
+ b7.into(),
+ b8.into(),
+ b9.into(),
+ b10.into(),
+ b11.into(),
+ b12.into(),
+ b13.into(),
+ b14.into(),
+ b15.into(),
+ b16.into(),
+ ];
+
+ let n = buf.bytes_vec_mut(&mut bufs);
+ read_ready_vecs(&mut bufs[..n], raw_fd)
+}
+
+unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize {
+ let iovecs = iovec::unix::as_os_slice_mut(bufs);
+
+ libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
+}
+
+unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize {
+ // The `IoVec` type can't have a zero-length size, so create a dummy
+ // version from a 1-length slice which we'll overwrite with the
+ // `bytes_vec` method.
+ static DUMMY: &[u8] = &[0];
+ let iovec = <&IoVec>::from(DUMMY);
+ let mut bufs = [
+ iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec,
+ iovec, iovec, iovec,
+ ];
+
+ let n = buf.bytes_vec(&mut bufs);
+ write_ready_vecs(&bufs[..n], raw_fd)
+}
+
+unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize {
+ let iovecs = iovec::unix::as_os_slice(bufs);
+
+ libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32)
+}
diff --git a/third_party/rust/tokio-uds/src/ucred.rs b/third_party/rust/tokio-uds/src/ucred.rs
new file mode 100644
index 0000000000..bc53ea1717
--- /dev/null
+++ b/third_party/rust/tokio-uds/src/ucred.rs
@@ -0,0 +1,159 @@
+use libc::{gid_t, uid_t};
+
+/// Credentials of a process
+#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
+pub struct UCred {
+ /// UID (user ID) of the process
+ pub uid: uid_t,
+ /// GID (group ID) of the process
+ pub gid: gid_t,
+}
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub use self::impl_linux::get_peer_cred;
+
+#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
+pub use self::impl_macos::get_peer_cred;
+
+#[cfg(any(target_os = "solaris"))]
+pub use self::impl_solaris::get_peer_cred;
+
+#[cfg(any(target_os = "linux", target_os = "android"))]
+pub mod impl_linux {
+ use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED};
+ use std::{io, mem};
+ use UnixStream;
+ use std::os::unix::io::AsRawFd;
+
+ use libc::ucred;
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut ucred = ucred {
+ pid: 0,
+ uid: 0,
+ gid: 0,
+ };
+
+ let ucred_size = mem::size_of::<ucred>();
+
+ // These paranoid checks should be optimized-out
+ assert!(mem::size_of::<u32>() <= mem::size_of::<usize>());
+ assert!(ucred_size <= u32::max_value() as usize);
+
+ let mut ucred_size = ucred_size as socklen_t;
+
+ let ret = getsockopt(
+ raw_fd,
+ SOL_SOCKET,
+ SO_PEERCRED,
+ &mut ucred as *mut ucred as *mut c_void,
+ &mut ucred_size,
+ );
+ if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() {
+ Ok(super::UCred {
+ uid: ucred.uid,
+ gid: ucred.gid,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))]
+pub mod impl_macos {
+ use libc::getpeereid;
+ use std::{io, mem};
+ use UnixStream;
+ use std::os::unix::io::AsRawFd;
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut cred: super::UCred = mem::uninitialized();
+
+ let ret = getpeereid(raw_fd, &mut cred.uid, &mut cred.gid);
+
+ if ret == 0 {
+ Ok(cred)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+
+#[cfg(any(target_os = "solaris"))]
+pub mod impl_solaris {
+ use std::io;
+ use std::os::unix::io::AsRawFd;
+ use UnixStream;
+ use std::ptr;
+
+ #[allow(non_camel_case_types)]
+ enum ucred_t {}
+
+ extern "C" {
+ fn ucred_free(cred: *mut ucred_t);
+ fn ucred_geteuid(cred: *const ucred_t) -> super::uid_t;
+ fn ucred_getegid(cred: *const ucred_t) -> super::gid_t;
+
+ fn getpeerucred(fd: ::std::os::raw::c_int, cred: *mut *mut ucred_t) -> ::std::os::raw::c_int;
+ }
+
+ pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> {
+ unsafe {
+ let raw_fd = sock.as_raw_fd();
+
+ let mut cred = ptr::null_mut::<*mut ucred_t>() as *mut ucred_t;
+
+ let ret = getpeerucred(raw_fd, &mut cred);
+
+ if ret == 0 {
+ let uid = ucred_geteuid(cred);
+ let gid = ucred_getegid(cred);
+
+ ucred_free(cred);
+
+ Ok(super::UCred {
+ uid,
+ gid,
+ })
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+}
+
+
+// Note that LOCAL_PEERCRED is not supported on DragonFly (yet). So do not run tests.
+#[cfg(not(target_os = "dragonfly"))]
+#[cfg(test)]
+mod test {
+ use UnixStream;
+ use libc::geteuid;
+ use libc::getegid;
+
+ #[test]
+ #[cfg_attr(target_os = "freebsd", ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419")]
+ #[cfg_attr(target_os = "netbsd", ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()")]
+ fn test_socket_pair() {
+ let (a, b) = UnixStream::pair().unwrap();
+ let cred_a = a.peer_cred().unwrap();
+ let cred_b = b.peer_cred().unwrap();
+ assert_eq!(cred_a, cred_b);
+
+ let uid = unsafe { geteuid() };
+ let gid = unsafe { getegid() };
+
+ assert_eq!(cred_a.uid, uid);
+ assert_eq!(cred_a.gid, gid);
+ }
+}
diff --git a/third_party/rust/tokio-uds/tests/datagram.rs b/third_party/rust/tokio-uds/tests/datagram.rs
new file mode 100644
index 0000000000..6f58f7c077
--- /dev/null
+++ b/third_party/rust/tokio-uds/tests/datagram.rs
@@ -0,0 +1,83 @@
+#![cfg(unix)]
+
+extern crate bytes;
+extern crate futures;
+extern crate tempfile;
+extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_uds;
+
+use tokio_uds::*;
+
+use std::str;
+
+use bytes::BytesMut;
+
+use tokio::io;
+use tokio::runtime::current_thread::Runtime;
+
+use tokio_codec::{Decoder, Encoder};
+
+use futures::{Future, Sink, Stream};
+
+struct StringDatagramCodec;
+
+/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
+impl Encoder for StringDatagramCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
+ dst.extend_from_slice(&item.into_bytes());
+ Ok(())
+ }
+}
+
+/// A codec to decode datagrams from a unix domain socket as utf-8 text messages.
+impl Decoder for StringDatagramCodec {
+ type Item = String;
+ type Error = io::Error;
+
+ fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
+ let decoded = str::from_utf8(buf)
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
+ .to_string();
+
+ Ok(Some(decoded))
+ }
+}
+
+#[test]
+fn framed_echo() {
+ let dir = tempfile::tempdir().unwrap();
+ let server_path = dir.path().join("server.sock");
+ let client_path = dir.path().join("client.sock");
+
+ let mut rt = Runtime::new().unwrap();
+
+ {
+ let socket = UnixDatagram::bind(&server_path).unwrap();
+ let server = UnixDatagramFramed::new(socket, StringDatagramCodec);
+
+ let (sink, stream) = server.split();
+
+ let echo_stream = stream.map(|(msg, addr)| (msg, addr.as_pathname().unwrap().to_path_buf()));
+
+ // spawn echo server
+ rt.spawn(echo_stream.forward(sink)
+ .map_err(|e| panic!("err={:?}", e))
+ .map(|_| ()));
+ }
+
+ {
+ let socket = UnixDatagram::bind(&client_path).unwrap();
+ let client = UnixDatagramFramed::new(socket, StringDatagramCodec);
+
+ let (sink, stream) = client.split();
+
+ rt.block_on(sink.send(("ECHO".to_string(), server_path))).unwrap();
+
+ let response = rt.block_on(stream.take(1).collect()).unwrap();
+ assert_eq!(response[0].0, "ECHO");
+ }
+}
diff --git a/third_party/rust/tokio-uds/tests/stream.rs b/third_party/rust/tokio-uds/tests/stream.rs
new file mode 100644
index 0000000000..ebb1835243
--- /dev/null
+++ b/third_party/rust/tokio-uds/tests/stream.rs
@@ -0,0 +1,55 @@
+#![cfg(unix)]
+
+extern crate futures;
+extern crate tokio;
+extern crate tokio_uds;
+
+extern crate tempfile;
+
+use tokio_uds::*;
+
+use tokio::io;
+use tokio::runtime::current_thread::Runtime;
+
+use futures::{Future, Stream};
+use futures::sync::oneshot;
+use tempfile::Builder;
+
+macro_rules! t {
+ ($e:expr) => (match $e {
+ Ok(e) => e,
+ Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
+ })
+}
+
+#[test]
+fn echo() {
+ let dir = Builder::new().prefix("tokio-uds-tests").tempdir().unwrap();
+ let sock_path = dir.path().join("connect.sock");
+
+ let mut rt = Runtime::new().unwrap();
+
+ let server = t!(UnixListener::bind(&sock_path));
+ let (tx, rx) = oneshot::channel();
+
+ rt.spawn({
+ server.incoming()
+ .into_future()
+ .and_then(move |(sock, _)| {
+ tx.send(sock.unwrap()).unwrap();
+ Ok(())
+ })
+ .map_err(|e| panic!("err={:?}", e))
+ });
+
+ let client = rt.block_on(UnixStream::connect(&sock_path)).unwrap();
+ let server = rt.block_on(rx).unwrap();
+
+ // Write to the client
+ rt.block_on(io::write_all(client, b"hello")).unwrap();
+
+ // Read from the server
+ let (_, buf) = rt.block_on(io::read_to_end(server, vec![])).unwrap();
+
+ assert_eq!(buf, b"hello");
+}