summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/process/unix
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio/src/process/unix
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/process/unix')
-rw-r--r--third_party/rust/tokio/src/process/unix/driver.rs58
-rw-r--r--third_party/rust/tokio/src/process/unix/mod.rs250
-rw-r--r--third_party/rust/tokio/src/process/unix/orphan.rs321
-rw-r--r--third_party/rust/tokio/src/process/unix/reap.rs298
4 files changed, 927 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/process/unix/driver.rs b/third_party/rust/tokio/src/process/unix/driver.rs
new file mode 100644
index 0000000000..84dc8fbd02
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/driver.rs
@@ -0,0 +1,58 @@
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
+//! Process driver.
+
+use crate::park::Park;
+use crate::process::unix::GlobalOrphanQueue;
+use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+
+use std::io;
+use std::time::Duration;
+
+/// Responsible for cleaning up orphaned child processes on Unix platforms.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ park: SignalDriver,
+ signal_handle: SignalHandle,
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: SignalDriver) -> Self {
+ let signal_handle = park.handle();
+
+ Self {
+ park,
+ signal_handle,
+ }
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <SignalDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
diff --git a/third_party/rust/tokio/src/process/unix/mod.rs b/third_party/rust/tokio/src/process/unix/mod.rs
new file mode 100644
index 0000000000..576fe6cb47
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/mod.rs
@@ -0,0 +1,250 @@
+//! Unix handling of child processes.
+//!
+//! Right now the only "fancy" thing about this is how we implement the
+//! `Future` implementation on `Child` to get the exit status. Unix offers
+//! no way to register a child with epoll, and the only real way to get a
+//! notification when a process exits is the SIGCHLD signal.
+//!
+//! Signal handling in general is *super* hairy and complicated, and it's even
+//! more complicated here with the fact that signals are coalesced, so we may
+//! not get a SIGCHLD-per-child.
+//!
+//! Our best approximation here is to check *all spawned processes* for all
+//! SIGCHLD signals received. To do that we create a `Signal`, implemented in
+//! the `tokio-net` crate, which is a stream over signals being received.
+//!
+//! Later when we poll the process's exit status we simply check to see if a
+//! SIGCHLD has happened since we last checked, and while that returns "yes" we
+//! keep trying.
+//!
+//! Note that this means that this isn't really scalable, but then again
+//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
+//! bad in theory...
+
+pub(crate) mod driver;
+
+pub(crate) mod orphan;
+use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
+
+mod reap;
+use reap::Reaper;
+
+use crate::io::PollEvented;
+use crate::process::kill::Kill;
+use crate::process::SpawnedChild;
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal, Signal, SignalKind};
+
+use mio::event::Source;
+use mio::unix::SourceFd;
+use once_cell::sync::Lazy;
+use std::fmt;
+use std::fs::File;
+use std::future::Future;
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::pin::Pin;
+use std::process::{Child as StdChild, ExitStatus, Stdio};
+use std::task::Context;
+use std::task::Poll;
+
+impl Wait for StdChild {
+ fn id(&self) -> u32 {
+ self.id()
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ self.try_wait()
+ }
+}
+
+impl Kill for StdChild {
+ fn kill(&mut self) -> io::Result<()> {
+ self.kill()
+ }
+}
+
+static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new);
+
+pub(crate) struct GlobalOrphanQueue;
+
+impl fmt::Debug for GlobalOrphanQueue {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ ORPHAN_QUEUE.fmt(fmt)
+ }
+}
+
+impl GlobalOrphanQueue {
+ fn reap_orphans(handle: &SignalHandle) {
+ ORPHAN_QUEUE.reap_orphans(handle)
+ }
+}
+
+impl OrphanQueue<StdChild> for GlobalOrphanQueue {
+ fn push_orphan(&self, orphan: StdChild) {
+ ORPHAN_QUEUE.push_orphan(orphan)
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+pub(crate) struct Child {
+ inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
+}
+
+impl fmt::Debug for Child {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Child")
+ .field("pid", &self.inner.id())
+ .finish()
+ }
+}
+
+pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
+ let mut child = cmd.spawn()?;
+ let stdin = child.stdin.take().map(stdio).transpose()?;
+ let stdout = child.stdout.take().map(stdio).transpose()?;
+ let stderr = child.stderr.take().map(stdio).transpose()?;
+
+ let signal = signal(SignalKind::child())?;
+
+ Ok(SpawnedChild {
+ child: Child {
+ inner: Reaper::new(child, GlobalOrphanQueue, signal),
+ },
+ stdin,
+ stdout,
+ stderr,
+ })
+}
+
+impl Child {
+ pub(crate) fn id(&self) -> u32 {
+ self.inner.id()
+ }
+
+ pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ self.inner.inner_mut().try_wait()
+ }
+}
+
+impl Kill for Child {
+ fn kill(&mut self) -> io::Result<()> {
+ self.inner.kill()
+ }
+}
+
+impl Future for Child {
+ type Output = io::Result<ExitStatus>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.inner).poll(cx)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct Pipe {
+ // Actually a pipe and not a File. However, we are reusing `File` to get
+ // close on drop. This is a similar trick as `mio`.
+ fd: File,
+}
+
+impl<T: IntoRawFd> From<T> for Pipe {
+ fn from(fd: T) -> Self {
+ let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
+ Self { fd }
+ }
+}
+
+impl<'a> io::Read for &'a Pipe {
+ fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+ (&self.fd).read(bytes)
+ }
+}
+
+impl<'a> io::Write for &'a Pipe {
+ fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
+ (&self.fd).write(bytes)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.fd).flush()
+ }
+}
+
+impl AsRawFd for Pipe {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.as_raw_fd()
+ }
+}
+
+pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
+ let mut fd = io.into_inner()?.fd;
+
+ // Ensure that the fd to be inherited is set to *blocking* mode, as this
+ // is the default that virtually all programs expect to have. Those
+ // programs that know how to work with nonblocking stdio will know how to
+ // change it to nonblocking mode.
+ set_nonblocking(&mut fd, false)?;
+
+ Ok(Stdio::from(fd))
+}
+
+impl Source for Pipe {
+ fn register(
+ &mut self,
+ registry: &mio::Registry,
+ token: mio::Token,
+ interest: mio::Interest,
+ ) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).register(registry, token, interest)
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &mio::Registry,
+ token: mio::Token,
+ interest: mio::Interest,
+ ) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
+ }
+
+ fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).deregister(registry)
+ }
+}
+
+pub(crate) type ChildStdio = PollEvented<Pipe>;
+
+fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
+ unsafe {
+ let fd = fd.as_raw_fd();
+ let previous = libc::fcntl(fd, libc::F_GETFL);
+ if previous == -1 {
+ return Err(io::Error::last_os_error());
+ }
+
+ let new = if nonblocking {
+ previous | libc::O_NONBLOCK
+ } else {
+ previous & !libc::O_NONBLOCK
+ };
+
+ let r = libc::fcntl(fd, libc::F_SETFL, new);
+ if r == -1 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+
+ Ok(())
+}
+
+pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
+where
+ T: IntoRawFd,
+{
+ // Set the fd to nonblocking before we pass it to the event loop
+ let mut pipe = Pipe::from(io);
+ set_nonblocking(&mut pipe, true)?;
+
+ PollEvented::new(pipe)
+}
diff --git a/third_party/rust/tokio/src/process/unix/orphan.rs b/third_party/rust/tokio/src/process/unix/orphan.rs
new file mode 100644
index 0000000000..0e52530c37
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/orphan.rs
@@ -0,0 +1,321 @@
+use crate::loom::sync::{Mutex, MutexGuard};
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal_with_handle, SignalKind};
+use crate::sync::watch;
+use std::io;
+use std::process::ExitStatus;
+
+/// An interface for waiting on a process to exit.
+pub(crate) trait Wait {
+ /// Get the identifier for this process or diagnostics.
+ fn id(&self) -> u32;
+ /// Try waiting for a process to exit in a non-blocking manner.
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
+}
+
+impl<T: Wait> Wait for &mut T {
+ fn id(&self) -> u32 {
+ (**self).id()
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ (**self).try_wait()
+ }
+}
+
+/// An interface for queueing up an orphaned process so that it can be reaped.
+pub(crate) trait OrphanQueue<T> {
+ /// Adds an orphan to the queue.
+ fn push_orphan(&self, orphan: T);
+}
+
+impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
+ fn push_orphan(&self, orphan: T) {
+ (**self).push_orphan(orphan);
+ }
+}
+
+/// An implementation of `OrphanQueue`.
+#[derive(Debug)]
+pub(crate) struct OrphanQueueImpl<T> {
+ sigchild: Mutex<Option<watch::Receiver<()>>>,
+ queue: Mutex<Vec<T>>,
+}
+
+impl<T> OrphanQueueImpl<T> {
+ pub(crate) fn new() -> Self {
+ Self {
+ sigchild: Mutex::new(None),
+ queue: Mutex::new(Vec::new()),
+ }
+ }
+
+ #[cfg(test)]
+ fn len(&self) -> usize {
+ self.queue.lock().len()
+ }
+
+ pub(crate) fn push_orphan(&self, orphan: T)
+ where
+ T: Wait,
+ {
+ self.queue.lock().push(orphan)
+ }
+
+ /// Attempts to reap every process in the queue, ignoring any errors and
+ /// enqueueing any orphans which have not yet exited.
+ pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
+ where
+ T: Wait,
+ {
+ // If someone else is holding the lock, they will be responsible for draining
+ // the queue as necessary, so we can safely bail if that happens
+ if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
+ match &mut *sigchild_guard {
+ Some(sigchild) => {
+ if sigchild.try_has_changed().and_then(Result::ok).is_some() {
+ drain_orphan_queue(self.queue.lock());
+ }
+ }
+ None => {
+ let queue = self.queue.lock();
+
+ // Be lazy and only initialize the SIGCHLD listener if there
+ // are any orphaned processes in the queue.
+ if !queue.is_empty() {
+ // An errors shouldn't really happen here, but if it does it
+ // means that the signal driver isn't running, in
+ // which case there isn't anything we can
+ // register/initialize here, so we can try again later
+ if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
+ *sigchild_guard = Some(sigchild);
+ drain_orphan_queue(queue);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
+where
+ T: Wait,
+{
+ for i in (0..queue.len()).rev() {
+ match queue[i].try_wait() {
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
+ }
+ }
+ }
+
+ drop(queue);
+}
+
+#[cfg(all(test, not(loom)))]
+pub(crate) mod test {
+ use super::*;
+ use crate::io::driver::Driver as IoDriver;
+ use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+ use crate::sync::watch;
+ use std::cell::{Cell, RefCell};
+ use std::io;
+ use std::os::unix::process::ExitStatusExt;
+ use std::process::ExitStatus;
+ use std::rc::Rc;
+
+ pub(crate) struct MockQueue<W> {
+ pub(crate) all_enqueued: RefCell<Vec<W>>,
+ }
+
+ impl<W> MockQueue<W> {
+ pub(crate) fn new() -> Self {
+ Self {
+ all_enqueued: RefCell::new(Vec::new()),
+ }
+ }
+ }
+
+ impl<W> OrphanQueue<W> for MockQueue<W> {
+ fn push_orphan(&self, orphan: W) {
+ self.all_enqueued.borrow_mut().push(orphan);
+ }
+ }
+
+ struct MockWait {
+ total_waits: Rc<Cell<usize>>,
+ num_wait_until_status: usize,
+ return_err: bool,
+ }
+
+ impl MockWait {
+ fn new(num_wait_until_status: usize) -> Self {
+ Self {
+ total_waits: Rc::new(Cell::new(0)),
+ num_wait_until_status,
+ return_err: false,
+ }
+ }
+
+ fn with_err() -> Self {
+ Self {
+ total_waits: Rc::new(Cell::new(0)),
+ num_wait_until_status: 0,
+ return_err: true,
+ }
+ }
+ }
+
+ impl Wait for MockWait {
+ fn id(&self) -> u32 {
+ 42
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ let waits = self.total_waits.get();
+
+ let ret = if self.num_wait_until_status == waits {
+ if self.return_err {
+ Ok(Some(ExitStatus::from_raw(0)))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "mock err"))
+ }
+ } else {
+ Ok(None)
+ };
+
+ self.total_waits.set(waits + 1);
+ ret
+ }
+ }
+
+ #[test]
+ fn drain_attempts_a_single_reap_of_all_queued_orphans() {
+ let first_orphan = MockWait::new(0);
+ let second_orphan = MockWait::new(1);
+ let third_orphan = MockWait::new(2);
+ let fourth_orphan = MockWait::with_err();
+
+ let first_waits = first_orphan.total_waits.clone();
+ let second_waits = second_orphan.total_waits.clone();
+ let third_waits = third_orphan.total_waits.clone();
+ let fourth_waits = fourth_orphan.total_waits.clone();
+
+ let orphanage = OrphanQueueImpl::new();
+ orphanage.push_orphan(first_orphan);
+ orphanage.push_orphan(third_orphan);
+ orphanage.push_orphan(second_orphan);
+ orphanage.push_orphan(fourth_orphan);
+
+ assert_eq!(orphanage.len(), 4);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 2);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 1);
+ assert_eq!(third_waits.get(), 1);
+ assert_eq!(fourth_waits.get(), 1);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 1);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 2);
+ assert_eq!(third_waits.get(), 2);
+ assert_eq!(fourth_waits.get(), 1);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 0);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 2);
+ assert_eq!(third_waits.get(), 3);
+ assert_eq!(fourth_waits.get(), 1);
+
+ // Safe to reap when empty
+ drain_orphan_queue(orphanage.queue.lock());
+ }
+
+ #[test]
+ fn no_reap_if_no_signal_received() {
+ let (tx, rx) = watch::channel(());
+
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ *orphanage.sigchild.lock() = Some(rx);
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ tx.send(()).unwrap();
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 1);
+ }
+
+ #[test]
+ fn no_reap_if_signal_lock_held() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ let signal_guard = orphanage.sigchild.lock();
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ drop(signal_guard);
+ }
+
+ #[cfg_attr(miri, ignore)] // Miri does not support epoll.
+ #[test]
+ fn does_not_register_signal_if_queue_empty() {
+ let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
+ let handle = signal_driver.handle();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none()); // Sanity
+
+ // No register when queue empty
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_some());
+ assert_eq!(waits.get(), 1); // Eager reap when registering listener
+ }
+
+ #[test]
+ fn does_nothing_if_signal_could_not_be_registered() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ // Signal handler has "gone away", nothing to register or reap
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+ assert_eq!(waits.get(), 0);
+ }
+}
diff --git a/third_party/rust/tokio/src/process/unix/reap.rs b/third_party/rust/tokio/src/process/unix/reap.rs
new file mode 100644
index 0000000000..f7f4d3cc70
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/reap.rs
@@ -0,0 +1,298 @@
+use crate::process::imp::orphan::{OrphanQueue, Wait};
+use crate::process::kill::Kill;
+use crate::signal::unix::InternalStream;
+
+use std::future::Future;
+use std::io;
+use std::ops::Deref;
+use std::pin::Pin;
+use std::process::ExitStatus;
+use std::task::Context;
+use std::task::Poll;
+
+/// Orchestrates between registering interest for receiving signals when a
+/// child process has exited, and attempting to poll for process completion.
+#[derive(Debug)]
+pub(crate) struct Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ inner: Option<W>,
+ orphan_queue: Q,
+ signal: S,
+}
+
+impl<W, Q, S> Deref for Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ type Target = W;
+
+ fn deref(&self) -> &Self::Target {
+ self.inner()
+ }
+}
+
+impl<W, Q, S> Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
+ Self {
+ inner: Some(inner),
+ orphan_queue,
+ signal,
+ }
+ }
+
+ fn inner(&self) -> &W {
+ self.inner.as_ref().expect("inner has gone away")
+ }
+
+ pub(crate) fn inner_mut(&mut self) -> &mut W {
+ self.inner.as_mut().expect("inner has gone away")
+ }
+}
+
+impl<W, Q, S> Future for Reaper<W, Q, S>
+where
+ W: Wait + Unpin,
+ Q: OrphanQueue<W> + Unpin,
+ S: InternalStream + Unpin,
+{
+ type Output = io::Result<ExitStatus>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ // If the child hasn't exited yet, then it's our responsibility to
+ // ensure the current task gets notified when it might be able to
+ // make progress. We can use the delivery of a SIGCHLD signal as a
+ // sign that we can potentially make progress.
+ //
+ // However, we will register for a notification on the next signal
+ // BEFORE we poll the child. Otherwise it is possible that the child
+ // can exit and the signal can arrive after we last polled the child,
+ // but before we've registered for a notification on the next signal
+ // (this can cause a deadlock if there are no more spawned children
+ // which can generate a different signal for us). A side effect of
+ // pre-registering for signal notifications is that when the child
+ // exits, we will have already registered for an additional
+ // notification we don't need to consume. If another signal arrives,
+ // this future's task will be notified/woken up again. Since the
+ // futures model allows for spurious wake ups this extra wakeup
+ // should not cause significant issues with parent futures.
+ let registered_interest = self.signal.poll_recv(cx).is_pending();
+
+ if let Some(status) = self.inner_mut().try_wait()? {
+ return Poll::Ready(Ok(status));
+ }
+
+ // If our attempt to poll for the next signal was not ready, then
+ // we've arranged for our task to get notified and we can bail out.
+ if registered_interest {
+ return Poll::Pending;
+ } else {
+ // Otherwise, if the signal stream delivered a signal to us, we
+ // won't get notified at the next signal, so we'll loop and try
+ // again.
+ continue;
+ }
+ }
+ }
+}
+
+impl<W, Q, S> Kill for Reaper<W, Q, S>
+where
+ W: Kill + Wait,
+ Q: OrphanQueue<W>,
+{
+ fn kill(&mut self) -> io::Result<()> {
+ self.inner_mut().kill()
+ }
+}
+
+impl<W, Q, S> Drop for Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ fn drop(&mut self) {
+ if let Ok(Some(_)) = self.inner_mut().try_wait() {
+ return;
+ }
+
+ let orphan = self.inner.take().unwrap();
+ self.orphan_queue.push_orphan(orphan);
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ use crate::process::unix::orphan::test::MockQueue;
+ use futures::future::FutureExt;
+ use std::os::unix::process::ExitStatusExt;
+ use std::process::ExitStatus;
+ use std::task::Context;
+ use std::task::Poll;
+
+ #[derive(Debug)]
+ struct MockWait {
+ total_kills: usize,
+ total_waits: usize,
+ num_wait_until_status: usize,
+ status: ExitStatus,
+ }
+
+ impl MockWait {
+ fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
+ Self {
+ total_kills: 0,
+ total_waits: 0,
+ num_wait_until_status,
+ status,
+ }
+ }
+ }
+
+ impl Wait for MockWait {
+ fn id(&self) -> u32 {
+ 0
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ let ret = if self.num_wait_until_status == self.total_waits {
+ Some(self.status)
+ } else {
+ None
+ };
+
+ self.total_waits += 1;
+ Ok(ret)
+ }
+ }
+
+ impl Kill for MockWait {
+ fn kill(&mut self) -> io::Result<()> {
+ self.total_kills += 1;
+ Ok(())
+ }
+ }
+
+ struct MockStream {
+ total_polls: usize,
+ values: Vec<Option<()>>,
+ }
+
+ impl MockStream {
+ fn new(values: Vec<Option<()>>) -> Self {
+ Self {
+ total_polls: 0,
+ values,
+ }
+ }
+ }
+
+ impl InternalStream for MockStream {
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.total_polls += 1;
+ match self.values.remove(0) {
+ Some(()) => Poll::Ready(Some(())),
+ None => Poll::Pending,
+ }
+ }
+ }
+
+ #[test]
+ fn reaper() {
+ let exit = ExitStatus::from_raw(0);
+ let mock = MockWait::new(exit, 3);
+ let mut grim = Reaper::new(
+ mock,
+ MockQueue::new(),
+ MockStream::new(vec![None, Some(()), None, None, None]),
+ );
+
+ let waker = futures::task::noop_waker();
+ let mut context = Context::from_waker(&waker);
+
+ // Not yet exited, interest registered
+ assert!(grim.poll_unpin(&mut context).is_pending());
+ assert_eq!(1, grim.signal.total_polls);
+ assert_eq!(1, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+
+ // Not yet exited, couldn't register interest the first time
+ // but managed to register interest the second time around
+ assert!(grim.poll_unpin(&mut context).is_pending());
+ assert_eq!(3, grim.signal.total_polls);
+ assert_eq!(3, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+
+ // Exited
+ if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
+ assert!(r.is_ok());
+ let exit_code = r.unwrap();
+ assert_eq!(exit_code, exit);
+ } else {
+ unreachable!();
+ }
+ assert_eq!(4, grim.signal.total_polls);
+ assert_eq!(4, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+ }
+
+ #[test]
+ fn kill() {
+ let exit = ExitStatus::from_raw(0);
+ let mut grim = Reaper::new(
+ MockWait::new(exit, 0),
+ MockQueue::new(),
+ MockStream::new(vec![None]),
+ );
+
+ grim.kill().unwrap();
+ assert_eq!(1, grim.total_kills);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+ }
+
+ #[test]
+ fn drop_reaps_if_possible() {
+ let exit = ExitStatus::from_raw(0);
+ let mut mock = MockWait::new(exit, 0);
+
+ {
+ let queue = MockQueue::new();
+
+ let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
+
+ drop(grim);
+
+ assert!(queue.all_enqueued.borrow().is_empty());
+ }
+
+ assert_eq!(1, mock.total_waits);
+ assert_eq!(0, mock.total_kills);
+ }
+
+ #[test]
+ fn drop_enqueues_orphan_if_wait_fails() {
+ let exit = ExitStatus::from_raw(0);
+ let mut mock = MockWait::new(exit, 2);
+
+ {
+ let queue = MockQueue::<&mut MockWait>::new();
+ let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
+ drop(grim);
+
+ assert_eq!(1, queue.all_enqueued.borrow().len());
+ }
+
+ assert_eq!(1, mock.total_waits);
+ assert_eq!(0, mock.total_kills);
+ }
+}