diff options
Diffstat (limited to 'media/audioipc/audioipc/src/tokio_uds_stream.rs')
-rw-r--r-- | media/audioipc/audioipc/src/tokio_uds_stream.rs | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/media/audioipc/audioipc/src/tokio_uds_stream.rs b/media/audioipc/audioipc/src/tokio_uds_stream.rs new file mode 100644 index 0000000000..db005d7287 --- /dev/null +++ b/media/audioipc/audioipc/src/tokio_uds_stream.rs @@ -0,0 +1,363 @@ +// Copied from tokio-uds/src/stream.rs revision 25e835c5b7e2cfeb9c22b1fd576844f6814a9477 (tokio-uds 0.2.5) +// License MIT per upstream: https://github.com/tokio-rs/tokio/blob/master/tokio-uds/LICENSE +// - Removed ucred for build simplicity +// - Added clear_{read,write}_ready per: https://github.com/tokio-rs/tokio/pull/1294 + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; + +use bytes::{Buf, BufMut}; +use futures::{Async, Future, Poll}; +use iovec::{self, IoVec}; +use libc; +use mio::Ready; +use mio_uds; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::net::Shutdown; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::{self, SocketAddr}; +use std::path::Path; + +/// A structure representing a connected Unix socket. +/// +/// This socket can be connected directly with `UnixStream::connect` or accepted +/// from a listener with `UnixListener::incoming`. Additionally, a pair of +/// anonymous Unix sockets can be created with `UnixStream::pair`. +pub struct UnixStream { + io: PollEvented<mio_uds::UnixStream>, +} + +/// Future returned by `UnixStream::connect` which will resolve to a +/// `UnixStream` when the stream is connected. +#[derive(Debug)] +pub struct ConnectFuture { + inner: State, +} + +#[derive(Debug)] +enum State { + Waiting(UnixStream), + Error(io::Error), + Empty, +} + +impl UnixStream { + /// Connects to the socket named by `path`. + /// + /// This function will create a new Unix socket and connect to the path + /// specified, associating the returned stream with the default event loop's + /// handle. + pub fn connect<P>(path: P) -> ConnectFuture + where + P: AsRef<Path>, + { + let res = mio_uds::UnixStream::connect(path).map(UnixStream::new); + + let inner = match res { + Ok(stream) => State::Waiting(stream), + Err(e) => State::Error(e), + }; + + ConnectFuture { inner } + } + + /// Consumes a `UnixStream` in the standard library and returns a + /// nonblocking `UnixStream` from this crate. + /// + /// The returned stream will be associated with the given event loop + /// specified by `handle` and is ready to perform I/O. + pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> { + let stream = mio_uds::UnixStream::from_stream(stream)?; + let io = PollEvented::new_with_handle(stream, handle)?; + + Ok(UnixStream { io }) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// This function will create a pair of interconnected Unix sockets for + /// communicating back and forth between one another. Each socket will + /// be associated with the default event loop's handle. + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = mio_uds::UnixStream::pair()?; + let a = UnixStream::new(a); + let b = UnixStream::new(b); + + Ok((a, b)) + } + + pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream { + let io = PollEvented::new(stream); + UnixStream { io } + } + + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll<Ready, io::Error> { + self.io.poll_read_ready(ready) + } + + /// Clear read ready state. + pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> { + self.io.clear_read_ready(ready) + } + + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll<Ready, io::Error> { + self.io.poll_write_ready() + } + + /// Clear write ready state. + pub fn clear_write_ready(&self) -> io::Result<()> { + self.io.clear_write_ready() + } + + /// Returns the socket address of the local half of this connection. + pub fn local_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().local_addr() + } + + /// Returns the socket address of the remote half of this connection. + pub fn peer_addr(&self) -> io::Result<SocketAddr> { + self.io.get_ref().peer_addr() + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result<Option<io::Error>> { + self.io.get_ref().take_error() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io.get_ref().shutdown(how) + } +} + +impl Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.io.read(buf) + } +} + +impl Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.io.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl AsyncRead for UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&UnixStream>::read_buf(&mut &*self, buf) + } +} + +impl AsyncWrite for UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&UnixStream>::shutdown(&mut &*self) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + <&UnixStream>::write_buf(&mut &*self, buf) + } +} + +impl<'a> Read for &'a UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a UnixStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = <UnixStream>::poll_read_ready(self, Ready::readable())? { + return Ok(Async::NotReady); + } + unsafe { + let r = read_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance_mut(r); + Ok(r.into()) + } + } + } +} + +impl<'a> AsyncWrite for &'a UnixStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } + + fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { + if let Async::NotReady = <UnixStream>::poll_write_ready(self)? { + return Ok(Async::NotReady); + } + unsafe { + let r = write_ready(buf, self.as_raw_fd()); + if r == -1 { + let e = io::Error::last_os_error(); + if e.kind() == io::ErrorKind::WouldBlock { + self.io.clear_write_ready()?; + Ok(Async::NotReady) + } else { + Err(e) + } + } else { + let r = r as usize; + buf.advance(r); + Ok(r.into()) + } + } + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.io.get_ref().fmt(f) + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.io.get_ref().as_raw_fd() + } +} + +impl Future for ConnectFuture { + type Item = UnixStream; + type Error = io::Error; + + fn poll(&mut self) -> Poll<UnixStream, io::Error> { + use std::mem; + + match self.inner { + State::Waiting(ref mut stream) => { + if let Async::NotReady = stream.io.poll_write_ready()? { + return Ok(Async::NotReady); + } + + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); + } + } + State::Error(_) => { + let e = match mem::replace(&mut self.inner, State::Empty) { + State::Error(e) => e, + _ => unreachable!(), + }; + + return Err(e); + } + State::Empty => panic!("can't poll stream twice"), + } + + match mem::replace(&mut self.inner, State::Empty) { + State::Waiting(stream) => Ok(Async::Ready(stream)), + _ => unreachable!(), + } + } +} + +unsafe fn read_ready<B: BufMut>(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a 0-length size, so we create a bunch + // of dummy versions on the stack with 1 length which we'll quickly + // overwrite. + let b1: &mut [u8] = &mut [0]; + let b2: &mut [u8] = &mut [0]; + let b3: &mut [u8] = &mut [0]; + let b4: &mut [u8] = &mut [0]; + let b5: &mut [u8] = &mut [0]; + let b6: &mut [u8] = &mut [0]; + let b7: &mut [u8] = &mut [0]; + let b8: &mut [u8] = &mut [0]; + let b9: &mut [u8] = &mut [0]; + let b10: &mut [u8] = &mut [0]; + let b11: &mut [u8] = &mut [0]; + let b12: &mut [u8] = &mut [0]; + let b13: &mut [u8] = &mut [0]; + let b14: &mut [u8] = &mut [0]; + let b15: &mut [u8] = &mut [0]; + let b16: &mut [u8] = &mut [0]; + let mut bufs: [&mut IoVec; 16] = [ + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), + ]; + + let n = buf.bytes_vec_mut(&mut bufs); + read_ready_vecs(&mut bufs[..n], raw_fd) +} + +unsafe fn read_ready_vecs(bufs: &mut [&mut IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice_mut(bufs); + + libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} + +unsafe fn write_ready<B: Buf>(buf: &mut B, raw_fd: RawFd) -> isize { + // The `IoVec` type can't have a zero-length size, so create a dummy + // version from a 1-length slice which we'll overwrite with the + // `bytes_vec` method. + static DUMMY: &[u8] = &[0]; + let iovec = <&IoVec>::from(DUMMY); + let mut bufs = [ + iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, + iovec, iovec, iovec, + ]; + + let n = buf.bytes_vec(&mut bufs); + write_ready_vecs(&bufs[..n], raw_fd) +} + +unsafe fn write_ready_vecs(bufs: &[&IoVec], raw_fd: RawFd) -> isize { + let iovecs = iovec::unix::as_os_slice(bufs); + + libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) +} |