diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio-uds/src | |
parent | Initial commit. (diff) | |
download | firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.tar.xz firefox-2aa4a82499d4becd2284cdb482213d541b8804dd.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-uds/src')
-rw-r--r-- | third_party/rust/tokio-uds/src/datagram.rs | 209 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/frame.rs | 160 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/incoming.rs | 27 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/lib.rs | 38 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/listener.rs | 146 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/recv_dgram.rs | 82 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/send_dgram.rs | 81 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/stream.rs | 356 | ||||
-rw-r--r-- | third_party/rust/tokio-uds/src/ucred.rs | 159 |
9 files changed, 1258 insertions, 0 deletions
diff --git a/third_party/rust/tokio-uds/src/datagram.rs b/third_party/rust/tokio-uds/src/datagram.rs new file mode 100644 index 0000000000..da3f751a9f --- /dev/null +++ b/third_party/rust/tokio-uds/src/datagram.rs @@ -0,0 +1,209 @@ +use {RecvDgram, SendDgram}; + +use tokio_reactor::{Handle, PollEvented}; + +use futures::{Async, Poll}; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// An I/O object representing a Unix datagram socket. +pub struct UnixDatagram { + io: PollEvented<mio_uds::UnixDatagram>, +} + +impl UnixDatagram { + /// Creates a new `UnixDatagram` bound to the specified path. + pub fn bind<P>(path: P) -> io::Result<UnixDatagram> + where + P: AsRef<Path>, + { + let socket = mio_uds::UnixDatagram::bind(path)?; + Ok(UnixDatagram::new(socket)) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. Each socket will + /// be associated with the default event loop's handle. + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = mio_uds::UnixDatagram::pair()?; + let a = UnixDatagram::new(a); + let b = UnixDatagram::new(b); + + Ok((a, b)) + } + + /// Consumes a `UnixDatagram` in the standard library and returns a + /// nonblocking `UnixDatagram` from this crate. + /// + /// The returned datagram will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result<UnixDatagram> { + let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; + let io = PollEvented::new_with_handle(socket, handle)?; + Ok(UnixDatagram { io }) + } + + fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { + let io = PollEvented::new(socket); + UnixDatagram { io } + } + + /// Creates a new `UnixDatagram` which is not bound to any address. + pub fn unbound() -> io::Result<UnixDatagram> { + let socket = mio_uds::UnixDatagram::unbound()?; + Ok(UnixDatagram::new(socket)) + } + + /// Connects the socket to the specified address. + /// + /// The `send` method may be used to send data to the specified address. + /// `recv` and `recv_from` will only receive data from that address. + pub fn connect<P: AsRef<Path>>(&self, path: P) -> io::Result<()> { + self.io.get_ref().connect(path) + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> { + self.io.poll_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> { + self.io.poll_write_ready() + } + + /// Returns the local address that this socket is bound to. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the address of this socket's peer. + /// + /// The `connect` method will connect the socket to a peer. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read and the address from + /// whence the data came. + pub fn poll_recv_from(&self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { + try_ready!(self.io.poll_read_ready(Ready::readable())); + + match self.io.get_ref().recv_from(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read. + pub fn poll_recv(&self, buf: &mut [u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_read_ready(Ready::readable())); + + match self.io.get_ref().recv(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details. + pub fn recv_dgram<T>(self, buf: T) -> RecvDgram<T> + where + T: AsMut<[u8]>, + { + RecvDgram::new(self, buf) + } + + /// Sends data on the socket to the specified address. + /// + /// On success, returns the number of bytes written. + pub fn poll_send_to<P>(&self, buf: &[u8], path: P) -> Poll<usize, io::Error> + where + P: AsRef<Path>, + { + try_ready!(self.io.poll_write_ready()); + + match self.io.get_ref().send_to(buf, path) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Sends data on the socket to the socket's peer. + /// + /// The peer address may be set by the `connect` method, and this method + /// will return an error if the socket has not already been connected. + /// + /// On success, returns the number of bytes written. + pub fn poll_send(&self, buf: &[u8]) -> Poll<usize, io::Error> { + try_ready!(self.io.poll_write_ready()); + + match self.io.get_ref().send(buf) { + Ok(ret) => Ok(ret.into()), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } + + /// Returns a future sending the data in buf to the socket at path. + pub fn send_dgram<T, P>(self, buf: T, path: P) -> SendDgram<T, P> + where + T: AsRef<[u8]>, + P: AsRef<Path>, + { + SendDgram::new(self, buf, path) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.get_ref().take_error() + } + + /// Shut down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl fmt::Debug for UnixDatagram { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} diff --git a/third_party/rust/tokio-uds/src/frame.rs b/third_party/rust/tokio-uds/src/frame.rs new file mode 100644 index 0000000000..49c8948c64 --- /dev/null +++ b/third_party/rust/tokio-uds/src/frame.rs @@ -0,0 +1,160 @@ +use std::io; +use std::os::unix::net::SocketAddr; +use std::path::Path; + +use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; + +use super::UnixDatagram; + +use tokio_codec::{Decoder, Encoder}; +use bytes::{BytesMut, BufMut}; + +/// A unified `Stream` and `Sink` interface to an underlying `UnixDatagram`, using +/// the `Encoder` and `Decoder` traits to encode and decode frames. +/// +/// Unix datagram sockets work with datagrams, but higher-level code may wants to +/// batch these into meaningful chunks, called "frames". This method layers +/// framing on top of this socket by using the `Encoder` and `Decoder` traits to +/// handle encoding and decoding of messages frames. Note that the incoming and +/// outgoing frame types may be distinct. +/// +/// This function returns a *single* object that is both `Stream` and `Sink`; +/// grouping this into a single object is often useful for layering things which +/// require both read and write access to the underlying object. +/// +/// If you want to work more directly with the streams and sink, consider +/// calling `split` on the `UnixDatagramFramed` returned by this method, which will break +/// them into separate objects, allowing them to interact more easily. +#[must_use = "sinks do nothing unless polled"] +#[derive(Debug)] +pub struct UnixDatagramFramed<A, C> { + socket: UnixDatagram, + codec: C, + rd: BytesMut, + wr: BytesMut, + out_addr: Option<A>, + flushed: bool, +} + +impl<A, C: Decoder> Stream for UnixDatagramFramed<A, C> { + type Item = (C::Item, SocketAddr); + type Error = C::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + self.rd.reserve(INITIAL_RD_CAPACITY); + + let (n, addr) = unsafe { + let (n, addr) = try_ready!(self.socket.poll_recv_from(self.rd.bytes_mut())); + self.rd.advance_mut(n); + (n, addr) + }; + trace!("received {} bytes, decoding", n); + let frame_res = self.codec.decode(&mut self.rd); + self.rd.clear(); + let frame = frame_res?; + let result = frame.map(|frame| (frame, addr)); + trace!("frame decoded from buffer"); + Ok(Async::Ready(result)) + } +} + +impl<A: AsRef<Path>, C: Encoder> Sink for UnixDatagramFramed<A, C> { + type SinkItem = (C::Item, A); + type SinkError = C::Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { + trace!("sending frame"); + + if !self.flushed { + match try!(self.poll_complete()) { + Async::Ready(()) => {}, + Async::NotReady => return Ok(AsyncSink::NotReady(item)), + } + } + + let (frame, out_addr) = item; + self.codec.encode(frame, &mut self.wr)?; + self.out_addr = Some(out_addr); + self.flushed = false; + trace!("frame encoded; length={}", self.wr.len()); + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), C::Error> { + if self.flushed { + return Ok(Async::Ready(())) + } + + let n = { + let out_path = match self.out_addr { + Some(ref out_path) => out_path.as_ref(), + None => return Err(io::Error::new(io::ErrorKind::Other, + "internal error: addr not available while data not flushed").into()), + }; + + trace!("flushing frame; length={}", self.wr.len()); + try_ready!(self.socket.poll_send_to(&self.wr, out_path)) + }; + + trace!("written {}", n); + + let wrote_all = n == self.wr.len(); + self.wr.clear(); + self.flushed = true; + + if wrote_all { + self.out_addr = None; + Ok(Async::Ready(())) + } else { + Err(io::Error::new(io::ErrorKind::Other, + "failed to write entire datagram to socket").into()) + } + } + + fn close(&mut self) -> Poll<(), C::Error> { + self.poll_complete() + } +} + +const INITIAL_RD_CAPACITY: usize = 64 * 1024; +const INITIAL_WR_CAPACITY: usize = 8 * 1024; + +impl<A, C> UnixDatagramFramed<A, C> { + /// Create a new `UnixDatagramFramed` backed by the given socket and codec. + /// + /// See struct level documentation for more details. + pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<A, C> { + UnixDatagramFramed { + socket: socket, + codec: codec, + out_addr: None, + rd: BytesMut::with_capacity(INITIAL_RD_CAPACITY), + wr: BytesMut::with_capacity(INITIAL_WR_CAPACITY), + flushed: true, + } + } + + /// Returns a reference to the underlying I/O stream wrapped by `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_ref(&self) -> &UnixDatagram { + &self.socket + } + + /// Returns a mutable reference to the underlying I/O stream wrapped by + /// `Framed`. + /// + /// # Note + /// + /// Care should be taken to not tamper with the underlying stream of data + /// coming in as it may corrupt the stream of frames otherwise being worked + /// with. + pub fn get_mut(&mut self) -> &mut UnixDatagram { + &mut self.socket + } +} diff --git a/third_party/rust/tokio-uds/src/incoming.rs b/third_party/rust/tokio-uds/src/incoming.rs new file mode 100644 index 0000000000..28d4d76837 --- /dev/null +++ b/third_party/rust/tokio-uds/src/incoming.rs @@ -0,0 +1,27 @@ +use {UnixListener, UnixStream}; + +use futures::{Stream, Poll}; + +use std::io; + +/// Stream of listeners +#[derive(Debug)] +pub struct Incoming { + inner: UnixListener, +} + +impl Incoming { + pub(crate) fn new(listener: UnixListener) -> Incoming { + Incoming { inner: listener } + } +} + +impl Stream for Incoming { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { + Ok(Some(try_ready!(self.inner.poll_accept()).0).into()) + } +} + diff --git a/third_party/rust/tokio-uds/src/lib.rs b/third_party/rust/tokio-uds/src/lib.rs new file mode 100644 index 0000000000..4c02185a8e --- /dev/null +++ b/third_party/rust/tokio-uds/src/lib.rs @@ -0,0 +1,38 @@ +#![cfg(unix)] +#![doc(html_root_url = "https://docs.rs/tokio-uds/0.2.5")] +#![deny(missing_docs, warnings, missing_debug_implementations)] + +//! Unix Domain Sockets for Tokio. +//! +//! This crate provides APIs for using Unix Domain Sockets with Tokio. + +extern crate bytes; +#[macro_use] +extern crate futures; +extern crate iovec; +extern crate libc; +#[macro_use] +extern crate log; +extern crate mio; +extern crate mio_uds; +extern crate tokio_codec; +extern crate tokio_io; +extern crate tokio_reactor; + +mod datagram; +mod frame; +mod incoming; +mod listener; +mod recv_dgram; +mod send_dgram; +mod stream; +mod ucred; + +pub use datagram::UnixDatagram; +pub use frame::UnixDatagramFramed; +pub use incoming::Incoming; +pub use listener::UnixListener; +pub use recv_dgram::RecvDgram; +pub use send_dgram::SendDgram; +pub use stream::{UnixStream, ConnectFuture}; +pub use ucred::UCred; diff --git a/third_party/rust/tokio-uds/src/listener.rs b/third_party/rust/tokio-uds/src/listener.rs new file mode 100644 index 0000000000..c63b4a8432 --- /dev/null +++ b/third_party/rust/tokio-uds/src/listener.rs @@ -0,0 +1,146 @@ +use {Incoming, UnixStream}; + +use tokio_reactor::{Handle, PollEvented}; + +use futures::{Async, Poll}; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A Unix socket which can accept connections from other Unix sockets. +pub struct UnixListener { + io: PollEvented<mio_uds::UnixListener>, +} + +impl UnixListener { + /// Creates a new `UnixListener` bound to the specified path. + pub fn bind<P>(path: P) -> io::Result<UnixListener> + where + P: AsRef<Path>, + { + let listener = mio_uds::UnixListener::bind(path)?; + let io = PollEvented::new(listener); + Ok(UnixListener { io }) + } + + /// Consumes a `UnixListener` in the standard library and returns a + /// nonblocking `UnixListener` from this crate. + /// + /// The returned listener will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result<UnixListener> { + let listener = mio_uds::UnixListener::from_listener(listener)?; + let io = PollEvented::new_with_handle(listener, handle)?; + Ok(UnixListener { io }) + } + + /// Returns the local socket address of this listener. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> { + self.io.poll_read_ready(ready) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.get_ref().take_error() + } + + /// Attempt to accept a connection and create a new connected `UnixStream` + /// if successful. + /// + /// This function will attempt an accept operation, but will not block + /// waiting for it to complete. If the operation would block then a "would + /// block" error is returned. Additionally, if this method would block, it + /// registers the current task to receive a notification when it would + /// otherwise not block. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `UnixStream`s with the `incoming` method + /// below. + /// + /// # Panics + /// + /// This function will panic if it is called outside the context of a + /// future's task. It's recommended to only call this from the + /// implementation of a `Future::poll`, if necessary. + pub fn poll_accept(&self) -> Poll<(UnixStream, SocketAddr), io::Error> { + let (io, addr) = try_ready!(self.poll_accept_std()); + + let io = mio_uds::UnixStream::from_stream(io)?; + Ok((UnixStream::new(io), addr).into()) + } + + /// Attempt to accept a connection and create a new connected `UnixStream` + /// if successful. + /// + /// This function is the same as `poll_accept` above except that it returns a + /// `mio_uds::UnixStream` instead of a `tokio_udp::UnixStream`. This in turn + /// can then allow for the stream to be associated with a different reactor + /// than the one this `UnixListener` is associated with. + /// + /// This function will attempt an accept operation, but will not block + /// waiting for it to complete. If the operation would block then a "would + /// block" error is returned. Additionally, if this method would block, it + /// registers the current task to receive a notification when it would + /// otherwise not block. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `UnixStream`s with the `incoming` method + /// below. + /// + /// # Panics + /// + /// This function will panic if it is called outside the context of a + /// future's task. It's recommended to only call this from the + /// implementation of a `Future::poll`, if necessary. + pub fn poll_accept_std(&self) -> Poll<(net::UnixStream, SocketAddr), io::Error> { + loop { + try_ready!(self.io.poll_read_ready(Ready::readable())); + + match self.io.get_ref().accept_std() { + Ok(None) => { + self.io.clear_read_ready(Ready::readable())?; + return Ok(Async::NotReady); + } + Ok(Some((sock, addr))) => { + return Ok(Async::Ready((sock, addr))); + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io.clear_read_ready(Ready::readable())?; + return Ok(Async::NotReady); + } + Err(err) => return Err(err), + } + } + } + + /// Consumes this listener, returning a stream of the sockets this listener + /// accepts. + /// + /// This method returns an implementation of the `Stream` trait which + /// resolves to the sockets the are accepted on this listener. + pub fn incoming(self) -> Incoming { + Incoming::new(self) + } +} + +impl fmt::Debug for UnixListener { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} diff --git a/third_party/rust/tokio-uds/src/recv_dgram.rs b/third_party/rust/tokio-uds/src/recv_dgram.rs new file mode 100644 index 0000000000..390202f38b --- /dev/null +++ b/third_party/rust/tokio-uds/src/recv_dgram.rs @@ -0,0 +1,82 @@ +use UnixDatagram; + +use futures::{Async, Future, Poll}; + +use std::io; +use std::mem; + +/// A future for receiving datagrams from a Unix datagram socket. +/// +/// An example that uses UDP sockets but is still applicable can be found at +/// https://gist.github.com/dermesser/e331094c2ab28fc7f6ba8a16183fe4d5. +#[derive(Debug)] +pub struct RecvDgram<T> { + st: State<T>, +} + +/// A future similar to RecvDgram, but without allocating and returning the peer's address. +/// +/// This can be used if the peer's address is of no interest, so the allocation overhead can be +/// avoided. +#[derive(Debug)] +enum State<T> { + Receiving { + sock: UnixDatagram, + buf: T, + }, + Empty, +} + +impl<T> RecvDgram<T> +where + T: AsMut<[u8]> +{ + pub(crate) fn new(sock: UnixDatagram, buf: T) -> RecvDgram<T> { + RecvDgram { + st: State::Receiving { + sock, + buf, + }, + } + } +} + +impl<T> Future for RecvDgram<T> +where + T: AsMut<[u8]>, +{ + /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were + /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the + /// datagram is truncated. + type Item = (UnixDatagram, T, usize, String); + /// This future yields io::Error if an error occurred. + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let received; + let peer; + + if let State::Receiving { + ref mut sock, + ref mut buf, + } = self.st + { + let (n, p) = try_ready!(sock.poll_recv_from(buf.as_mut())); + received = n; + + peer = p.as_pathname().map_or(String::new(), |p| { + p.to_str().map_or(String::new(), |s| s.to_string()) + }); + } else { + panic!() + } + + if let State::Receiving { sock, buf } = + mem::replace(&mut self.st, State::Empty) + { + Ok(Async::Ready((sock, buf, received, peer))) + } else { + panic!() + } + } +} diff --git a/third_party/rust/tokio-uds/src/send_dgram.rs b/third_party/rust/tokio-uds/src/send_dgram.rs new file mode 100644 index 0000000000..59d438b761 --- /dev/null +++ b/third_party/rust/tokio-uds/src/send_dgram.rs @@ -0,0 +1,81 @@ +use UnixDatagram; + +use futures::{Async, Future, Poll}; + +use std::io; +use std::mem; +use std::path::Path; + +/// A future for writing a buffer to a Unix datagram socket. +#[derive(Debug)] +pub struct SendDgram<T, P> { + st: State<T, P>, +} + +#[derive(Debug)] +enum State<T, P> { + /// current state is Sending + Sending { + /// the underlying socket + sock: UnixDatagram, + /// the buffer to send + buf: T, + /// the destination + addr: P, + }, + /// neutral state + Empty, +} + +impl<T, P> SendDgram<T, P> +where + T: AsRef<[u8]>, + P: AsRef<Path>, +{ + pub(crate) fn new(sock: UnixDatagram, buf: T, addr: P) -> SendDgram<T, P> { + SendDgram { + st: State::Sending { + sock, + buf, + addr, + } + } + } +} + +impl<T, P> Future for SendDgram<T, P> +where + T: AsRef<[u8]>, + P: AsRef<Path>, +{ + /// Returns the underlying socket and the buffer that was sent. + type Item = (UnixDatagram, T); + /// The error that is returned when sending failed. + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if let State::Sending { + ref mut sock, + ref buf, + ref addr, + } = self.st + { + let n = try_ready!(sock.poll_send_to(buf.as_ref(), addr)); + if n < buf.as_ref().len() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Couldn't send whole buffer".to_string(), + )); + } + } else { + panic!() + } + if let State::Sending { sock, buf, addr: _ } = + mem::replace(&mut self.st, State::Empty) + { + Ok(Async::Ready((sock, buf))) + } else { + panic!() + } + } +} diff --git a/third_party/rust/tokio-uds/src/stream.rs b/third_party/rust/tokio-uds/src/stream.rs new file mode 100644 index 0000000000..7098c85fe1 --- /dev/null +++ b/third_party/rust/tokio-uds/src/stream.rs @@ -0,0 +1,356 @@ +use ucred::{self, UCred}; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; + +use bytes::{Buf, BufMut}; +use futures::{Async, Future, Poll}; +use iovec::{self, IoVec}; +use libc; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A structure representing a connected Unix socket. +/// +/// This socket can be connected directly with `UnixStream::connect` or accepted +/// from a listener with `UnixListener::incoming`. Additionally, a pair of +/// anonymous Unix sockets can be created with `UnixStream::pair`. +pub struct UnixStream { + io: PollEvented<mio_uds::UnixStream>, +} + +/// Future returned by `UnixStream::connect` which will resolve to a +/// `UnixStream` when the stream is connected. +#[derive(Debug)] +pub struct ConnectFuture { + inner: State, +} + +#[derive(Debug)] +enum State { + Waiting(UnixStream), + Error(io::Error), + Empty, +} + +impl UnixStream { + /// Connects to the socket named by `path`. + /// + /// This function will create a new Unix socket and connect to the path + /// specified, associating the returned stream with the default event loop's + /// handle. + pub fn connect<P>(path: P) -> ConnectFuture + where + P: AsRef<Path>, + { + let res = mio_uds::UnixStream::connect(path) + .map(UnixStream::new); + + let inner = match res { + Ok(stream) => State::Waiting(stream), + Err(e) => State::Error(e), + }; + + ConnectFuture { inner } + } + + /// Consumes a `UnixStream` in the standard library and returns a + /// nonblocking `UnixStream` from this crate. + /// + /// The returned stream will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> { + let stream = mio_uds::UnixStream::from_stream(stream)?; + let io = PollEvented::new_with_handle(stream, handle)?; + + Ok(UnixStream { io }) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. Each socket will + /// be associated with the default event loop's handle. + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = try!(mio_uds::UnixStream::pair()); + let a = UnixStream::new(a); + let b = UnixStream::new(b); + + Ok((a, b)) + } + + pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream { + let io = PollEvented::new(stream); + UnixStream { io } + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> { + self.io.poll_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> { + self.io.poll_write_ready() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + /// Returns effective credentials of the process which called `connect` or `pair`. + pub fn peer_cred(&self) -> io::Result<UCred> { + ucred::get_peer_cred(self) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.read(buf) + } +} + +impl Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl AsyncRead for UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&UnixStream>::read_buf(&mut &*self, buf) + } +} + +impl AsyncWrite for UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&UnixStream>::shutdown(&mut &*self) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&UnixStream>::write_buf(&mut &*self, buf) + } +} + +impl<'a> Read for &'a UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? { + return Ok(Async::NotReady); + } + unsafe { + let r = read_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance_mut(r); + Ok(r.into()) + } + } + } +} + +impl<'a> AsyncWrite for &'a UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = <UnixStream>::poll_write_ready(self)? { + return Ok(Async::NotReady); + } + unsafe { + let r = write_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance(r); + Ok(r.into()) + } + } + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} + +impl Future for ConnectFuture { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<UnixStream, io::Error> { + use std::mem; + + match self.inner { + State::Waiting(ref mut stream) => { + if let Async::NotReady = stream.io.poll_write_ready()? { + return Ok(Async::NotReady) + } + + if let Some(e) = try!(stream.io.get_ref().take_error()) { + return Err(e) + } + } + State::Error(_) => { + let e = match mem::replace(&mut self.inner, State::Empty) { + State::Error(e) => e, + _ => unreachable!(), + }; + + return Err(e) + }, + State::Empty => panic!("can't poll stream twice"), + } + + match mem::replace(&mut self.inner, State::Empty) { + State::Waiting(stream) => Ok(Async::Ready(stream)), + _ => unreachable!(), + } + } +} + +unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), + ]; + + let n = buf.bytes_vec_mut(&mut bufs); + read_ready_vecs(&mut bufs[..n], raw_fd) +} + +unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice_mut(bufs); + + libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} + +unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [ + iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, + iovec, iovec, iovec, + ]; + + let n = buf.bytes_vec(&mut bufs); + write_ready_vecs(&bufs[..n], raw_fd) +} + +unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice(bufs); + + libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} diff --git a/third_party/rust/tokio-uds/src/ucred.rs b/third_party/rust/tokio-uds/src/ucred.rs new file mode 100644 index 0000000000..bc53ea1717 --- /dev/null +++ b/third_party/rust/tokio-uds/src/ucred.rs @@ -0,0 +1,159 @@ +use libc::{gid_t, uid_t}; + +/// Credentials of a process +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub struct UCred { + /// UID (user ID) of the process + pub uid: uid_t, + /// GID (group ID) of the process + pub gid: gid_t, +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub use self::impl_linux::get_peer_cred; + +#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))] +pub use self::impl_macos::get_peer_cred; + +#[cfg(any(target_os = "solaris"))] +pub use self::impl_solaris::get_peer_cred; + +#[cfg(any(target_os = "linux", target_os = "android"))] +pub mod impl_linux { + use libc::{c_void, getsockopt, socklen_t, SOL_SOCKET, SO_PEERCRED}; + use std::{io, mem}; + use UnixStream; + use std::os::unix::io::AsRawFd; + + use libc::ucred; + + pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut ucred = ucred { + pid: 0, + uid: 0, + gid: 0, + }; + + let ucred_size = mem::size_of::<ucred>(); + + // These paranoid checks should be optimized-out + assert!(mem::size_of::<u32>() <= mem::size_of::<usize>()); + assert!(ucred_size <= u32::max_value() as usize); + + let mut ucred_size = ucred_size as socklen_t; + + let ret = getsockopt( + raw_fd, + SOL_SOCKET, + SO_PEERCRED, + &mut ucred as *mut ucred as *mut c_void, + &mut ucred_size, + ); + if ret == 0 && ucred_size as usize == mem::size_of::<ucred>() { + Ok(super::UCred { + uid: ucred.uid, + gid: ucred.gid, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + +#[cfg(any(target_os = "dragonfly", target_os = "macos", target_os = "ios", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd"))] +pub mod impl_macos { + use libc::getpeereid; + use std::{io, mem}; + use UnixStream; + use std::os::unix::io::AsRawFd; + + pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut cred: super::UCred = mem::uninitialized(); + + let ret = getpeereid(raw_fd, &mut cred.uid, &mut cred.gid); + + if ret == 0 { + Ok(cred) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + + +#[cfg(any(target_os = "solaris"))] +pub mod impl_solaris { + use std::io; + use std::os::unix::io::AsRawFd; + use UnixStream; + use std::ptr; + + #[allow(non_camel_case_types)] + enum ucred_t {} + + extern "C" { + fn ucred_free(cred: *mut ucred_t); + fn ucred_geteuid(cred: *const ucred_t) -> super::uid_t; + fn ucred_getegid(cred: *const ucred_t) -> super::gid_t; + + fn getpeerucred(fd: ::std::os::raw::c_int, cred: *mut *mut ucred_t) -> ::std::os::raw::c_int; + } + + pub fn get_peer_cred(sock: &UnixStream) -> io::Result<super::UCred> { + unsafe { + let raw_fd = sock.as_raw_fd(); + + let mut cred = ptr::null_mut::<*mut ucred_t>() as *mut ucred_t; + + let ret = getpeerucred(raw_fd, &mut cred); + + if ret == 0 { + let uid = ucred_geteuid(cred); + let gid = ucred_getegid(cred); + + ucred_free(cred); + + Ok(super::UCred { + uid, + gid, + }) + } else { + Err(io::Error::last_os_error()) + } + } + } +} + + +// Note that LOCAL_PEERCRED is not supported on DragonFly (yet). So do not run tests. +#[cfg(not(target_os = "dragonfly"))] +#[cfg(test)] +mod test { + use UnixStream; + use libc::geteuid; + use libc::getegid; + + #[test] + #[cfg_attr(target_os = "freebsd", ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419")] + #[cfg_attr(target_os = "netbsd", ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()")] + fn test_socket_pair() { + let (a, b) = UnixStream::pair().unwrap(); + let cred_a = a.peer_cred().unwrap(); + let cred_b = b.peer_cred().unwrap(); + assert_eq!(cred_a, cred_b); + + let uid = unsafe { geteuid() }; + let gid = unsafe { getegid() }; + + assert_eq!(cred_a.uid, uid); + assert_eq!(cred_a.gid, gid); + } +} |