diff options
Diffstat (limited to 'third_party/rust/tokio/src/process/unix')
-rw-r--r-- | third_party/rust/tokio/src/process/unix/driver.rs | 58 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/mod.rs | 250 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/orphan.rs | 321 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/reap.rs | 298 |
4 files changed, 927 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/process/unix/driver.rs b/third_party/rust/tokio/src/process/unix/driver.rs new file mode 100644 index 0000000000..84dc8fbd02 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/driver.rs @@ -0,0 +1,58 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Process driver. + +use crate::park::Park; +use crate::process::unix::GlobalOrphanQueue; +use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + +use std::io; +use std::time::Duration; + +/// Responsible for cleaning up orphaned child processes on Unix platforms. +#[derive(Debug)] +pub(crate) struct Driver { + park: SignalDriver, + signal_handle: SignalHandle, +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: SignalDriver) -> Self { + let signal_handle = park.handle(); + + Self { + park, + signal_handle, + } + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = <SignalDriver as Park>::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + GlobalOrphanQueue::reap_orphans(&self.signal_handle); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + GlobalOrphanQueue::reap_orphans(&self.signal_handle); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} diff --git a/third_party/rust/tokio/src/process/unix/mod.rs b/third_party/rust/tokio/src/process/unix/mod.rs new file mode 100644 index 0000000000..576fe6cb47 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/mod.rs @@ -0,0 +1,250 @@ +//! Unix handling of child processes. +//! +//! Right now the only "fancy" thing about this is how we implement the +//! `Future` implementation on `Child` to get the exit status. Unix offers +//! no way to register a child with epoll, and the only real way to get a +//! notification when a process exits is the SIGCHLD signal. +//! +//! Signal handling in general is *super* hairy and complicated, and it's even +//! more complicated here with the fact that signals are coalesced, so we may +//! not get a SIGCHLD-per-child. +//! +//! Our best approximation here is to check *all spawned processes* for all +//! SIGCHLD signals received. To do that we create a `Signal`, implemented in +//! the `tokio-net` crate, which is a stream over signals being received. +//! +//! Later when we poll the process's exit status we simply check to see if a +//! SIGCHLD has happened since we last checked, and while that returns "yes" we +//! keep trying. +//! +//! Note that this means that this isn't really scalable, but then again +//! processes in general aren't scalable (e.g. millions) so it shouldn't be that +//! bad in theory... + +pub(crate) mod driver; + +pub(crate) mod orphan; +use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; + +mod reap; +use reap::Reaper; + +use crate::io::PollEvented; +use crate::process::kill::Kill; +use crate::process::SpawnedChild; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal, Signal, SignalKind}; + +use mio::event::Source; +use mio::unix::SourceFd; +use once_cell::sync::Lazy; +use std::fmt; +use std::fs::File; +use std::future::Future; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::pin::Pin; +use std::process::{Child as StdChild, ExitStatus, Stdio}; +use std::task::Context; +use std::task::Poll; + +impl Wait for StdChild { + fn id(&self) -> u32 { + self.id() + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + self.try_wait() + } +} + +impl Kill for StdChild { + fn kill(&mut self) -> io::Result<()> { + self.kill() + } +} + +static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new); + +pub(crate) struct GlobalOrphanQueue; + +impl fmt::Debug for GlobalOrphanQueue { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + ORPHAN_QUEUE.fmt(fmt) + } +} + +impl GlobalOrphanQueue { + fn reap_orphans(handle: &SignalHandle) { + ORPHAN_QUEUE.reap_orphans(handle) + } +} + +impl OrphanQueue<StdChild> for GlobalOrphanQueue { + fn push_orphan(&self, orphan: StdChild) { + ORPHAN_QUEUE.push_orphan(orphan) + } +} + +#[must_use = "futures do nothing unless polled"] +pub(crate) struct Child { + inner: Reaper<StdChild, GlobalOrphanQueue, Signal>, +} + +impl fmt::Debug for Child { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Child") + .field("pid", &self.inner.id()) + .finish() + } +} + +pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { + let mut child = cmd.spawn()?; + let stdin = child.stdin.take().map(stdio).transpose()?; + let stdout = child.stdout.take().map(stdio).transpose()?; + let stderr = child.stderr.take().map(stdio).transpose()?; + + let signal = signal(SignalKind::child())?; + + Ok(SpawnedChild { + child: Child { + inner: Reaper::new(child, GlobalOrphanQueue, signal), + }, + stdin, + stdout, + stderr, + }) +} + +impl Child { + pub(crate) fn id(&self) -> u32 { + self.inner.id() + } + + pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + self.inner.inner_mut().try_wait() + } +} + +impl Kill for Child { + fn kill(&mut self) -> io::Result<()> { + self.inner.kill() + } +} + +impl Future for Child { + type Output = io::Result<ExitStatus>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.inner).poll(cx) + } +} + +#[derive(Debug)] +pub(crate) struct Pipe { + // Actually a pipe and not a File. However, we are reusing `File` to get + // close on drop. This is a similar trick as `mio`. + fd: File, +} + +impl<T: IntoRawFd> From<T> for Pipe { + fn from(fd: T) -> Self { + let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) }; + Self { fd } + } +} + +impl<'a> io::Read for &'a Pipe { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + (&self.fd).read(bytes) + } +} + +impl<'a> io::Write for &'a Pipe { + fn write(&mut self, bytes: &[u8]) -> io::Result<usize> { + (&self.fd).write(bytes) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.fd).flush() + } +} + +impl AsRawFd for Pipe { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> { + let mut fd = io.into_inner()?.fd; + + // Ensure that the fd to be inherited is set to *blocking* mode, as this + // is the default that virtually all programs expect to have. Those + // programs that know how to work with nonblocking stdio will know how to + // change it to nonblocking mode. + set_nonblocking(&mut fd, false)?; + + Ok(Stdio::from(fd)) +} + +impl Source for Pipe { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).register(registry, token, interest) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).deregister(registry) + } +} + +pub(crate) type ChildStdio = PollEvented<Pipe>; + +fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { + unsafe { + let fd = fd.as_raw_fd(); + let previous = libc::fcntl(fd, libc::F_GETFL); + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + + let r = libc::fcntl(fd, libc::F_SETFL, new); + if r == -1 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} + +pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>> +where + T: IntoRawFd, +{ + // Set the fd to nonblocking before we pass it to the event loop + let mut pipe = Pipe::from(io); + set_nonblocking(&mut pipe, true)?; + + PollEvented::new(pipe) +} diff --git a/third_party/rust/tokio/src/process/unix/orphan.rs b/third_party/rust/tokio/src/process/unix/orphan.rs new file mode 100644 index 0000000000..0e52530c37 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/orphan.rs @@ -0,0 +1,321 @@ +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; +use std::io; +use std::process::ExitStatus; + +/// An interface for waiting on a process to exit. +pub(crate) trait Wait { + /// Get the identifier for this process or diagnostics. + fn id(&self) -> u32; + /// Try waiting for a process to exit in a non-blocking manner. + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; +} + +impl<T: Wait> Wait for &mut T { + fn id(&self) -> u32 { + (**self).id() + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + (**self).try_wait() + } +} + +/// An interface for queueing up an orphaned process so that it can be reaped. +pub(crate) trait OrphanQueue<T> { + /// Adds an orphan to the queue. + fn push_orphan(&self, orphan: T); +} + +impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { + fn push_orphan(&self, orphan: T) { + (**self).push_orphan(orphan); + } +} + +/// An implementation of `OrphanQueue`. +#[derive(Debug)] +pub(crate) struct OrphanQueueImpl<T> { + sigchild: Mutex<Option<watch::Receiver<()>>>, + queue: Mutex<Vec<T>>, +} + +impl<T> OrphanQueueImpl<T> { + pub(crate) fn new() -> Self { + Self { + sigchild: Mutex::new(None), + queue: Mutex::new(Vec::new()), + } + } + + #[cfg(test)] + fn len(&self) -> usize { + self.queue.lock().len() + } + + pub(crate) fn push_orphan(&self, orphan: T) + where + T: Wait, + { + self.queue.lock().push(orphan) + } + + /// Attempts to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + pub(crate) fn reap_orphans(&self, handle: &SignalHandle) + where + T: Wait, + { + // If someone else is holding the lock, they will be responsible for draining + // the queue as necessary, so we can safely bail if that happens + if let Some(mut sigchild_guard) = self.sigchild.try_lock() { + match &mut *sigchild_guard { + Some(sigchild) => { + if sigchild.try_has_changed().and_then(Result::ok).is_some() { + drain_orphan_queue(self.queue.lock()); + } + } + None => { + let queue = self.queue.lock(); + + // Be lazy and only initialize the SIGCHLD listener if there + // are any orphaned processes in the queue. + if !queue.is_empty() { + // An errors shouldn't really happen here, but if it does it + // means that the signal driver isn't running, in + // which case there isn't anything we can + // register/initialize here, so we can try again later + if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) { + *sigchild_guard = Some(sigchild); + drain_orphan_queue(queue); + } + } + } + } + } + } +} + +fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) +where + T: Wait, +{ + for i in (0..queue.len()).rev() { + match queue[i].try_wait() { + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); + } + } + } + + drop(queue); +} + +#[cfg(all(test, not(loom)))] +pub(crate) mod test { + use super::*; + use crate::io::driver::Driver as IoDriver; + use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + use crate::sync::watch; + use std::cell::{Cell, RefCell}; + use std::io; + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + use std::rc::Rc; + + pub(crate) struct MockQueue<W> { + pub(crate) all_enqueued: RefCell<Vec<W>>, + } + + impl<W> MockQueue<W> { + pub(crate) fn new() -> Self { + Self { + all_enqueued: RefCell::new(Vec::new()), + } + } + } + + impl<W> OrphanQueue<W> for MockQueue<W> { + fn push_orphan(&self, orphan: W) { + self.all_enqueued.borrow_mut().push(orphan); + } + } + + struct MockWait { + total_waits: Rc<Cell<usize>>, + num_wait_until_status: usize, + return_err: bool, + } + + impl MockWait { + fn new(num_wait_until_status: usize) -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status, + return_err: false, + } + } + + fn with_err() -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status: 0, + return_err: true, + } + } + } + + impl Wait for MockWait { + fn id(&self) -> u32 { + 42 + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + let waits = self.total_waits.get(); + + let ret = if self.num_wait_until_status == waits { + if self.return_err { + Ok(Some(ExitStatus::from_raw(0))) + } else { + Err(io::Error::new(io::ErrorKind::Other, "mock err")) + } + } else { + Ok(None) + }; + + self.total_waits.set(waits + 1); + ret + } + } + + #[test] + fn drain_attempts_a_single_reap_of_all_queued_orphans() { + let first_orphan = MockWait::new(0); + let second_orphan = MockWait::new(1); + let third_orphan = MockWait::new(2); + let fourth_orphan = MockWait::with_err(); + + let first_waits = first_orphan.total_waits.clone(); + let second_waits = second_orphan.total_waits.clone(); + let third_waits = third_orphan.total_waits.clone(); + let fourth_waits = fourth_orphan.total_waits.clone(); + + let orphanage = OrphanQueueImpl::new(); + orphanage.push_orphan(first_orphan); + orphanage.push_orphan(third_orphan); + orphanage.push_orphan(second_orphan); + orphanage.push_orphan(fourth_orphan); + + assert_eq!(orphanage.len(), 4); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 2); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 1); + assert_eq!(third_waits.get(), 1); + assert_eq!(fourth_waits.get(), 1); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 1); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 2); + assert_eq!(fourth_waits.get(), 1); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 0); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 3); + assert_eq!(fourth_waits.get(), 1); + + // Safe to reap when empty + drain_orphan_queue(orphanage.queue.lock()); + } + + #[test] + fn no_reap_if_no_signal_received() { + let (tx, rx) = watch::channel(()); + + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + *orphanage.sigchild.lock() = Some(rx); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + tx.send(()).unwrap(); + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 1); + } + + #[test] + fn no_reap_if_signal_lock_held() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + let signal_guard = orphanage.sigchild.lock(); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + drop(signal_guard); + } + + #[cfg_attr(miri, ignore)] // Miri does not support epoll. + #[test] + fn does_not_register_signal_if_queue_empty() { + let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let handle = signal_driver.handle(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); // Sanity + + // No register when queue empty + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_some()); + assert_eq!(waits.get(), 1); // Eager reap when registering listener + } + + #[test] + fn does_nothing_if_signal_could_not_be_registered() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + // Signal handler has "gone away", nothing to register or reap + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + assert_eq!(waits.get(), 0); + } +} diff --git a/third_party/rust/tokio/src/process/unix/reap.rs b/third_party/rust/tokio/src/process/unix/reap.rs new file mode 100644 index 0000000000..f7f4d3cc70 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/reap.rs @@ -0,0 +1,298 @@ +use crate::process::imp::orphan::{OrphanQueue, Wait}; +use crate::process::kill::Kill; +use crate::signal::unix::InternalStream; + +use std::future::Future; +use std::io; +use std::ops::Deref; +use std::pin::Pin; +use std::process::ExitStatus; +use std::task::Context; +use std::task::Poll; + +/// Orchestrates between registering interest for receiving signals when a +/// child process has exited, and attempting to poll for process completion. +#[derive(Debug)] +pub(crate) struct Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + inner: Option<W>, + orphan_queue: Q, + signal: S, +} + +impl<W, Q, S> Deref for Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + type Target = W; + + fn deref(&self) -> &Self::Target { + self.inner() + } +} + +impl<W, Q, S> Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { + Self { + inner: Some(inner), + orphan_queue, + signal, + } + } + + fn inner(&self) -> &W { + self.inner.as_ref().expect("inner has gone away") + } + + pub(crate) fn inner_mut(&mut self) -> &mut W { + self.inner.as_mut().expect("inner has gone away") + } +} + +impl<W, Q, S> Future for Reaper<W, Q, S> +where + W: Wait + Unpin, + Q: OrphanQueue<W> + Unpin, + S: InternalStream + Unpin, +{ + type Output = io::Result<ExitStatus>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + // If the child hasn't exited yet, then it's our responsibility to + // ensure the current task gets notified when it might be able to + // make progress. We can use the delivery of a SIGCHLD signal as a + // sign that we can potentially make progress. + // + // However, we will register for a notification on the next signal + // BEFORE we poll the child. Otherwise it is possible that the child + // can exit and the signal can arrive after we last polled the child, + // but before we've registered for a notification on the next signal + // (this can cause a deadlock if there are no more spawned children + // which can generate a different signal for us). A side effect of + // pre-registering for signal notifications is that when the child + // exits, we will have already registered for an additional + // notification we don't need to consume. If another signal arrives, + // this future's task will be notified/woken up again. Since the + // futures model allows for spurious wake ups this extra wakeup + // should not cause significant issues with parent futures. + let registered_interest = self.signal.poll_recv(cx).is_pending(); + + if let Some(status) = self.inner_mut().try_wait()? { + return Poll::Ready(Ok(status)); + } + + // If our attempt to poll for the next signal was not ready, then + // we've arranged for our task to get notified and we can bail out. + if registered_interest { + return Poll::Pending; + } else { + // Otherwise, if the signal stream delivered a signal to us, we + // won't get notified at the next signal, so we'll loop and try + // again. + continue; + } + } + } +} + +impl<W, Q, S> Kill for Reaper<W, Q, S> +where + W: Kill + Wait, + Q: OrphanQueue<W>, +{ + fn kill(&mut self) -> io::Result<()> { + self.inner_mut().kill() + } +} + +impl<W, Q, S> Drop for Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + fn drop(&mut self) { + if let Ok(Some(_)) = self.inner_mut().try_wait() { + return; + } + + let orphan = self.inner.take().unwrap(); + self.orphan_queue.push_orphan(orphan); + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + use crate::process::unix::orphan::test::MockQueue; + use futures::future::FutureExt; + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + use std::task::Context; + use std::task::Poll; + + #[derive(Debug)] + struct MockWait { + total_kills: usize, + total_waits: usize, + num_wait_until_status: usize, + status: ExitStatus, + } + + impl MockWait { + fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { + Self { + total_kills: 0, + total_waits: 0, + num_wait_until_status, + status, + } + } + } + + impl Wait for MockWait { + fn id(&self) -> u32 { + 0 + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + let ret = if self.num_wait_until_status == self.total_waits { + Some(self.status) + } else { + None + }; + + self.total_waits += 1; + Ok(ret) + } + } + + impl Kill for MockWait { + fn kill(&mut self) -> io::Result<()> { + self.total_kills += 1; + Ok(()) + } + } + + struct MockStream { + total_polls: usize, + values: Vec<Option<()>>, + } + + impl MockStream { + fn new(values: Vec<Option<()>>) -> Self { + Self { + total_polls: 0, + values, + } + } + } + + impl InternalStream for MockStream { + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { + self.total_polls += 1; + match self.values.remove(0) { + Some(()) => Poll::Ready(Some(())), + None => Poll::Pending, + } + } + } + + #[test] + fn reaper() { + let exit = ExitStatus::from_raw(0); + let mock = MockWait::new(exit, 3); + let mut grim = Reaper::new( + mock, + MockQueue::new(), + MockStream::new(vec![None, Some(()), None, None, None]), + ); + + let waker = futures::task::noop_waker(); + let mut context = Context::from_waker(&waker); + + // Not yet exited, interest registered + assert!(grim.poll_unpin(&mut context).is_pending()); + assert_eq!(1, grim.signal.total_polls); + assert_eq!(1, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + + // Not yet exited, couldn't register interest the first time + // but managed to register interest the second time around + assert!(grim.poll_unpin(&mut context).is_pending()); + assert_eq!(3, grim.signal.total_polls); + assert_eq!(3, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + + // Exited + if let Poll::Ready(r) = grim.poll_unpin(&mut context) { + assert!(r.is_ok()); + let exit_code = r.unwrap(); + assert_eq!(exit_code, exit); + } else { + unreachable!(); + } + assert_eq!(4, grim.signal.total_polls); + assert_eq!(4, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn kill() { + let exit = ExitStatus::from_raw(0); + let mut grim = Reaper::new( + MockWait::new(exit, 0), + MockQueue::new(), + MockStream::new(vec![None]), + ); + + grim.kill().unwrap(); + assert_eq!(1, grim.total_kills); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn drop_reaps_if_possible() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 0); + + { + let queue = MockQueue::new(); + + let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); + + drop(grim); + + assert!(queue.all_enqueued.borrow().is_empty()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); + } + + #[test] + fn drop_enqueues_orphan_if_wait_fails() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 2); + + { + let queue = MockQueue::<&mut MockWait>::new(); + let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); + drop(grim); + + assert_eq!(1, queue.all_enqueued.borrow().len()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); + } +} |