diff options
Diffstat (limited to 'third_party/rust/mio/src/sys/unix/pipe.rs')
-rw-r--r-- | third_party/rust/mio/src/sys/unix/pipe.rs | 431 |
1 files changed, 431 insertions, 0 deletions
diff --git a/third_party/rust/mio/src/sys/unix/pipe.rs b/third_party/rust/mio/src/sys/unix/pipe.rs new file mode 100644 index 0000000000..c899dfb2da --- /dev/null +++ b/third_party/rust/mio/src/sys/unix/pipe.rs @@ -0,0 +1,431 @@ +//! Unix pipe. +//! +//! See the [`new`] function for documentation. + +use std::fs::File; +use std::io::{self, IoSlice, IoSliceMut, Read, Write}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::process::{ChildStderr, ChildStdin, ChildStdout}; + +use crate::io_source::IoSource; +use crate::{event, Interest, Registry, Token}; + +/// Create a new non-blocking Unix pipe. +/// +/// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as +/// inter-process or thread communication channel. +/// +/// This channel may be created before forking the process and then one end used +/// in each process, e.g. the parent process has the sending end to send command +/// to the child process. +/// +/// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html +/// +/// # Events +/// +/// The [`Sender`] can be registered with [`WRITABLE`] interest to receive +/// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is +/// written to the `Sender` the `Receiver` will receive an [readable event]. +/// +/// In addition to those events, events will also be generated if the other side +/// is dropped. To check if the `Sender` is dropped you'll need to check +/// [`is_read_closed`] on events for the `Receiver`, if it returns true the +/// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it +/// returns true the `Receiver` was dropped. Also see the second example below. +/// +/// [`WRITABLE`]: Interest::WRITABLE +/// [writable events]: event::Event::is_writable +/// [`READABLE`]: Interest::READABLE +/// [readable event]: event::Event::is_readable +/// [`is_read_closed`]: event::Event::is_read_closed +/// [`is_write_closed`]: event::Event::is_write_closed +/// +/// # Deregistering +/// +/// Both `Sender` and `Receiver` will deregister themselves when dropped, +/// **iff** the file descriptors are not duplicated (via [`dup(2)`]). +/// +/// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html +/// +/// # Examples +/// +/// Simple example that writes data into the sending end and read it from the +/// receiving end. +/// +/// ``` +/// use std::io::{self, Read, Write}; +/// +/// use mio::{Poll, Events, Interest, Token}; +/// use mio::unix::pipe; +/// +/// // Unique tokens for the two ends of the channel. +/// const PIPE_RECV: Token = Token(0); +/// const PIPE_SEND: Token = Token(1); +/// +/// # fn main() -> io::Result<()> { +/// // Create our `Poll` instance and the `Events` container. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// // Create a new pipe. +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// // Register both ends of the channel. +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// const MSG: &[u8; 11] = b"Hello world"; +/// +/// loop { +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_SEND => sender.write(MSG) +/// .and_then(|n| if n != MSG.len() { +/// // We'll consider a short write an error in this +/// // example. NOTE: we can't use `write_all` with +/// // non-blocking I/O. +/// Err(io::ErrorKind::WriteZero.into()) +/// } else { +/// Ok(()) +/// })?, +/// PIPE_RECV => { +/// let mut buf = [0; 11]; +/// let n = receiver.read(&mut buf)?; +/// println!("received: {:?}", &buf[0..n]); +/// assert_eq!(n, MSG.len()); +/// assert_eq!(&buf, &*MSG); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// } +/// # } +/// ``` +/// +/// Example that receives an event once the `Sender` is dropped. +/// +/// ``` +/// # use std::io; +/// # +/// # use mio::{Poll, Events, Interest, Token}; +/// # use mio::unix::pipe; +/// # +/// # const PIPE_RECV: Token = Token(0); +/// # const PIPE_SEND: Token = Token(1); +/// # +/// # fn main() -> io::Result<()> { +/// // Same setup as in the example above. +/// let mut poll = Poll::new()?; +/// let mut events = Events::with_capacity(8); +/// +/// let (mut sender, mut receiver) = pipe::new()?; +/// +/// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; +/// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; +/// +/// // Drop the sender. +/// drop(sender); +/// +/// poll.poll(&mut events, None)?; +/// +/// for event in events.iter() { +/// match event.token() { +/// PIPE_RECV if event.is_read_closed() => { +/// // Detected that the sender was dropped. +/// println!("Sender dropped!"); +/// return Ok(()); +/// }, +/// _ => unreachable!(), +/// } +/// } +/// # unreachable!(); +/// # } +/// ``` +pub fn new() -> io::Result<(Sender, Receiver)> { + let mut fds: [RawFd; 2] = [-1, -1]; + + #[cfg(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "illumos", + ))] + unsafe { + if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { + return Err(io::Error::last_os_error()); + } + } + + #[cfg(any(target_os = "ios", target_os = "macos"))] + unsafe { + // For platforms that don't have `pipe2(2)` we need to manually set the + // correct flags on the file descriptor. + if libc::pipe(fds.as_mut_ptr()) != 0 { + return Err(io::Error::last_os_error()); + } + + for fd in &fds { + if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 + || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 + { + let err = io::Error::last_os_error(); + // Don't leak file descriptors. Can't handle error though. + let _ = libc::close(fds[0]); + let _ = libc::close(fds[1]); + return Err(err); + } + } + } + + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd", + target_os = "ios", + target_os = "macos", + target_os = "illumos", + )))] + compile_error!("unsupported target for `mio::unix::pipe`"); + + // Safety: we just initialised the `fds` above. + let r = unsafe { Receiver::from_raw_fd(fds[0]) }; + let w = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((w, r)) +} + +/// Sending end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Sender { + inner: IoSource<File>, +} + +impl Sender { + /// Set the `Sender` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Sender { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Write for Sender { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +impl Write for &Sender { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write(buf)) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.do_io(|sender| (&*sender).flush()) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdin> for Sender { + fn from(stdin: ChildStdin) -> Sender { + // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. + unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } + } +} + +impl FromRawFd for Sender { + unsafe fn from_raw_fd(fd: RawFd) -> Sender { + Sender { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Sender { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Sender { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +/// Receiving end of an Unix pipe. +/// +/// See [`new`] for documentation, including examples. +#[derive(Debug)] +pub struct Receiver { + inner: IoSource<File>, +} + +impl Receiver { + /// Set the `Receiver` into or out of non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + set_nonblocking(self.inner.as_raw_fd(), nonblocking) + } +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Read for Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +impl Read for &Receiver { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read(buf)) + } + + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { + self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStdout> for Receiver { + fn from(stdout: ChildStdout) -> Receiver { + // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } + } +} + +/// # Notes +/// +/// The underlying pipe is **not** set to non-blocking. +impl From<ChildStderr> for Receiver { + fn from(stderr: ChildStderr) -> Receiver { + // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. + unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } + } +} + +impl FromRawFd for Receiver { + unsafe fn from_raw_fd(fd: RawFd) -> Receiver { + Receiver { + inner: IoSource::new(File::from_raw_fd(fd)), + } + } +} + +impl AsRawFd for Receiver { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } +} + +impl IntoRawFd for Receiver { + fn into_raw_fd(self) -> RawFd { + self.inner.into_inner().into_raw_fd() + } +} + +#[cfg(not(target_os = "illumos"))] +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let value = nonblocking as libc::c_int; + if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(()) + } +} + +#[cfg(target_os = "illumos")] +fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + if flags < 0 { + return Err(io::Error::last_os_error()); + } + + let nflags = if nonblocking { + flags | libc::O_NONBLOCK + } else { + flags & !libc::O_NONBLOCK + }; + + if flags != nflags { + if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} |