summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/process
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/process')
-rw-r--r--third_party/rust/tokio/src/process/kill.rs13
-rw-r--r--third_party/rust/tokio/src/process/mod.rs1534
-rw-r--r--third_party/rust/tokio/src/process/unix/driver.rs58
-rw-r--r--third_party/rust/tokio/src/process/unix/mod.rs250
-rw-r--r--third_party/rust/tokio/src/process/unix/orphan.rs321
-rw-r--r--third_party/rust/tokio/src/process/unix/reap.rs298
-rw-r--r--third_party/rust/tokio/src/process/windows.rs205
7 files changed, 2679 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/process/kill.rs b/third_party/rust/tokio/src/process/kill.rs
new file mode 100644
index 0000000000..a1f1652281
--- /dev/null
+++ b/third_party/rust/tokio/src/process/kill.rs
@@ -0,0 +1,13 @@
+use std::io;
+
+/// An interface for killing a running process.
+pub(crate) trait Kill {
+ /// Forcefully kills the process.
+ fn kill(&mut self) -> io::Result<()>;
+}
+
+impl<T: Kill> Kill for &mut T {
+ fn kill(&mut self) -> io::Result<()> {
+ (**self).kill()
+ }
+}
diff --git a/third_party/rust/tokio/src/process/mod.rs b/third_party/rust/tokio/src/process/mod.rs
new file mode 100644
index 0000000000..4e1a21dd44
--- /dev/null
+++ b/third_party/rust/tokio/src/process/mod.rs
@@ -0,0 +1,1534 @@
+//! An implementation of asynchronous process management for Tokio.
+//!
+//! This module provides a [`Command`] struct that imitates the interface of the
+//! [`std::process::Command`] type in the standard library, but provides asynchronous versions of
+//! functions that create processes. These functions (`spawn`, `status`, `output` and their
+//! variants) return "future aware" types that interoperate with Tokio. The asynchronous process
+//! support is provided through signal handling on Unix and system APIs on Windows.
+//!
+//! [`std::process::Command`]: std::process::Command
+//!
+//! # Examples
+//!
+//! Here's an example program which will spawn `echo hello world` and then wait
+//! for it complete.
+//!
+//! ```no_run
+//! use tokio::process::Command;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // The usage is similar as with the standard library's `Command` type
+//! let mut child = Command::new("echo")
+//! .arg("hello")
+//! .arg("world")
+//! .spawn()
+//! .expect("failed to spawn");
+//!
+//! // Await until the command completes
+//! let status = child.wait().await?;
+//! println!("the command exited with: {}", status);
+//! Ok(())
+//! }
+//! ```
+//!
+//! Next, let's take a look at an example where we not only spawn `echo hello
+//! world` but we also capture its output.
+//!
+//! ```no_run
+//! use tokio::process::Command;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Like above, but use `output` which returns a future instead of
+//! // immediately returning the `Child`.
+//! let output = Command::new("echo").arg("hello").arg("world")
+//! .output();
+//!
+//! let output = output.await?;
+//!
+//! assert!(output.status.success());
+//! assert_eq!(output.stdout, b"hello world\n");
+//! Ok(())
+//! }
+//! ```
+//!
+//! We can also read input line by line.
+//!
+//! ```no_run
+//! use tokio::io::{BufReader, AsyncBufReadExt};
+//! use tokio::process::Command;
+//!
+//! use std::process::Stdio;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let mut cmd = Command::new("cat");
+//!
+//! // Specify that we want the command's standard output piped back to us.
+//! // By default, standard input/output/error will be inherited from the
+//! // current process (for example, this means that standard input will
+//! // come from the keyboard and standard output/error will go directly to
+//! // the terminal if this process is invoked from the command line).
+//! cmd.stdout(Stdio::piped());
+//!
+//! let mut child = cmd.spawn()
+//! .expect("failed to spawn command");
+//!
+//! let stdout = child.stdout.take()
+//! .expect("child did not have a handle to stdout");
+//!
+//! let mut reader = BufReader::new(stdout).lines();
+//!
+//! // Ensure the child process is spawned in the runtime so it can
+//! // make progress on its own while we await for any output.
+//! tokio::spawn(async move {
+//! let status = child.wait().await
+//! .expect("child process encountered an error");
+//!
+//! println!("child status was: {}", status);
+//! });
+//!
+//! while let Some(line) = reader.next_line().await? {
+//! println!("Line: {}", line);
+//! }
+//!
+//! Ok(())
+//! }
+//! ```
+//!
+//! Here is another example using `sort` writing into the child process
+//! standard input, capturing the output of the sorted text.
+//!
+//! ```no_run
+//! use tokio::io::AsyncWriteExt;
+//! use tokio::process::Command;
+//!
+//! use std::process::Stdio;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let mut cmd = Command::new("sort");
+//!
+//! // Specifying that we want pipe both the output and the input.
+//! // Similarily to capturing the output, by configuring the pipe
+//! // to stdin it can now be used as an asynchronous writer.
+//! cmd.stdout(Stdio::piped());
+//! cmd.stdin(Stdio::piped());
+//!
+//! let mut child = cmd.spawn().expect("failed to spawn command");
+//!
+//! // These are the animals we want to sort
+//! let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"];
+//!
+//! let mut stdin = child
+//! .stdin
+//! .take()
+//! .expect("child did not have a handle to stdin");
+//!
+//! // Write our animals to the child process
+//! // Note that the behavior of `sort` is to buffer _all input_ before writing any output.
+//! // In the general sense, it is recommended to write to the child in a separate task as
+//! // awaiting its exit (or output) to avoid deadlocks (for example, the child tries to write
+//! // some output but gets stuck waiting on the parent to read from it, meanwhile the parent
+//! // is stuck waiting to write its input completely before reading the output).
+//! stdin
+//! .write(animals.join("\n").as_bytes())
+//! .await
+//! .expect("could not write to stdin");
+//!
+//! // We drop the handle here which signals EOF to the child process.
+//! // This tells the child process that it there is no more data on the pipe.
+//! drop(stdin);
+//!
+//! let op = child.wait_with_output().await?;
+//!
+//! // Results should come back in sorted order
+//! assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes());
+//!
+//! Ok(())
+//! }
+//! ```
+//!
+//! With some coordination, we can also pipe the output of one command into
+//! another.
+//!
+//! ```no_run
+//! use tokio::join;
+//! use tokio::process::Command;
+//! use std::convert::TryInto;
+//! use std::process::Stdio;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let mut echo = Command::new("echo")
+//! .arg("hello world!")
+//! .stdout(Stdio::piped())
+//! .spawn()
+//! .expect("failed to spawn echo");
+//!
+//! let tr_stdin: Stdio = echo
+//! .stdout
+//! .take()
+//! .unwrap()
+//! .try_into()
+//! .expect("failed to convert to Stdio");
+//!
+//! let tr = Command::new("tr")
+//! .arg("a-z")
+//! .arg("A-Z")
+//! .stdin(tr_stdin)
+//! .stdout(Stdio::piped())
+//! .spawn()
+//! .expect("failed to spawn tr");
+//!
+//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output());
+//!
+//! assert!(echo_result.unwrap().success());
+//!
+//! let tr_output = tr_output.expect("failed to await tr");
+//! assert!(tr_output.status.success());
+//!
+//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n");
+//!
+//! Ok(())
+//! }
+//! ```
+//!
+//! # Caveats
+//!
+//! ## Dropping/Cancellation
+//!
+//! Similar to the behavior to the standard library, and unlike the futures
+//! paradigm of dropping-implies-cancellation, a spawned process will, by
+//! default, continue to execute even after the `Child` handle has been dropped.
+//!
+//! The [`Command::kill_on_drop`] method can be used to modify this behavior
+//! and kill the child process if the `Child` wrapper is dropped before it
+//! has exited.
+//!
+//! ## Unix Processes
+//!
+//! On Unix platforms processes must be "reaped" by their parent process after
+//! they have exited in order to release all OS resources. A child process which
+//! has exited, but has not yet been reaped by its parent is considered a "zombie"
+//! process. Such processes continue to count against limits imposed by the system,
+//! and having too many zombie processes present can prevent additional processes
+//! from being spawned.
+//!
+//! The tokio runtime will, on a best-effort basis, attempt to reap and clean up
+//! any process which it has spawned. No additional guarantees are made with regards
+//! how quickly or how often this procedure will take place.
+//!
+//! It is recommended to avoid dropping a [`Child`] process handle before it has been
+//! fully `await`ed if stricter cleanup guarantees are required.
+//!
+//! [`Command`]: crate::process::Command
+//! [`Command::kill_on_drop`]: crate::process::Command::kill_on_drop
+//! [`Child`]: crate::process::Child
+
+#[path = "unix/mod.rs"]
+#[cfg(unix)]
+mod imp;
+
+#[cfg(unix)]
+pub(crate) mod unix {
+ pub(crate) use super::imp::*;
+}
+
+#[path = "windows.rs"]
+#[cfg(windows)]
+mod imp;
+
+mod kill;
+
+use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
+use crate::process::kill::Kill;
+
+use std::convert::TryInto;
+use std::ffi::OsStr;
+use std::future::Future;
+use std::io;
+#[cfg(unix)]
+use std::os::unix::process::CommandExt;
+#[cfg(windows)]
+use std::os::windows::io::{AsRawHandle, RawHandle};
+#[cfg(windows)]
+use std::os::windows::process::CommandExt;
+use std::path::Path;
+use std::pin::Pin;
+use std::process::{Command as StdCommand, ExitStatus, Output, Stdio};
+use std::task::Context;
+use std::task::Poll;
+
+/// This structure mimics the API of [`std::process::Command`] found in the standard library, but
+/// replaces functions that create a process with an asynchronous variant. The main provided
+/// asynchronous functions are [spawn](Command::spawn), [status](Command::status), and
+/// [output](Command::output).
+///
+/// `Command` uses asynchronous versions of some `std` types (for example [`Child`]).
+///
+/// [`std::process::Command`]: std::process::Command
+/// [`Child`]: struct@Child
+#[derive(Debug)]
+pub struct Command {
+ std: StdCommand,
+ kill_on_drop: bool,
+}
+
+pub(crate) struct SpawnedChild {
+ child: imp::Child,
+ stdin: Option<imp::ChildStdio>,
+ stdout: Option<imp::ChildStdio>,
+ stderr: Option<imp::ChildStdio>,
+}
+
+impl Command {
+ /// Constructs a new `Command` for launching the program at
+ /// path `program`, with the following default configuration:
+ ///
+ /// * No arguments to the program
+ /// * Inherit the current process's environment
+ /// * Inherit the current process's working directory
+ /// * Inherit stdin/stdout/stderr for `spawn` or `status`, but create pipes for `output`
+ ///
+ /// Builder methods are provided to change these defaults and
+ /// otherwise configure the process.
+ ///
+ /// If `program` is not an absolute path, the `PATH` will be searched in
+ /// an OS-defined way.
+ ///
+ /// The search path to be used may be controlled by setting the
+ /// `PATH` environment variable on the Command,
+ /// but this has some implementation limitations on Windows
+ /// (see issue [rust-lang/rust#37519]).
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ /// let command = Command::new("sh");
+ /// ```
+ ///
+ /// [rust-lang/rust#37519]: https://github.com/rust-lang/rust/issues/37519
+ pub fn new<S: AsRef<OsStr>>(program: S) -> Command {
+ Self::from(StdCommand::new(program))
+ }
+
+ /// Cheaply convert to a `&std::process::Command` for places where the type from the standard
+ /// library is expected.
+ pub fn as_std(&self) -> &StdCommand {
+ &self.std
+ }
+
+ /// Adds an argument to pass to the program.
+ ///
+ /// Only one argument can be passed per use. So instead of:
+ ///
+ /// ```no_run
+ /// tokio::process::Command::new("sh")
+ /// .arg("-C /path/to/repo");
+ /// ```
+ ///
+ /// usage would be:
+ ///
+ /// ```no_run
+ /// tokio::process::Command::new("sh")
+ /// .arg("-C")
+ /// .arg("/path/to/repo");
+ /// ```
+ ///
+ /// To pass multiple arguments see [`args`].
+ ///
+ /// [`args`]: method@Self::args
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .arg("-l")
+ /// .arg("-a");
+ /// ```
+ pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command {
+ self.std.arg(arg);
+ self
+ }
+
+ /// Adds multiple arguments to pass to the program.
+ ///
+ /// To pass a single argument see [`arg`].
+ ///
+ /// [`arg`]: method@Self::arg
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .args(&["-l", "-a"]);
+ /// ```
+ pub fn args<I, S>(&mut self, args: I) -> &mut Command
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<OsStr>,
+ {
+ self.std.args(args);
+ self
+ }
+
+ /// Inserts or updates an environment variable mapping.
+ ///
+ /// Note that environment variable names are case-insensitive (but case-preserving) on Windows,
+ /// and case-sensitive on all other platforms.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .env("PATH", "/bin");
+ /// ```
+ pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command
+ where
+ K: AsRef<OsStr>,
+ V: AsRef<OsStr>,
+ {
+ self.std.env(key, val);
+ self
+ }
+
+ /// Adds or updates multiple environment variable mappings.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ /// use std::process::{Stdio};
+ /// use std::env;
+ /// use std::collections::HashMap;
+ ///
+ /// let filtered_env : HashMap<String, String> =
+ /// env::vars().filter(|&(ref k, _)|
+ /// k == "TERM" || k == "TZ" || k == "LANG" || k == "PATH"
+ /// ).collect();
+ ///
+ /// let command = Command::new("printenv")
+ /// .stdin(Stdio::null())
+ /// .stdout(Stdio::inherit())
+ /// .env_clear()
+ /// .envs(&filtered_env);
+ /// ```
+ pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<OsStr>,
+ V: AsRef<OsStr>,
+ {
+ self.std.envs(vars);
+ self
+ }
+
+ /// Removes an environment variable mapping.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .env_remove("PATH");
+ /// ```
+ pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command {
+ self.std.env_remove(key);
+ self
+ }
+
+ /// Clears the entire environment map for the child process.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .env_clear();
+ /// ```
+ pub fn env_clear(&mut self) -> &mut Command {
+ self.std.env_clear();
+ self
+ }
+
+ /// Sets the working directory for the child process.
+ ///
+ /// # Platform-specific behavior
+ ///
+ /// If the program path is relative (e.g., `"./script.sh"`), it's ambiguous
+ /// whether it should be interpreted relative to the parent's working
+ /// directory or relative to `current_dir`. The behavior in this case is
+ /// platform specific and unstable, and it's recommended to use
+ /// [`canonicalize`] to get an absolute program path instead.
+ ///
+ /// [`canonicalize`]: crate::fs::canonicalize()
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .current_dir("/bin");
+ /// ```
+ pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command {
+ self.std.current_dir(dir);
+ self
+ }
+
+ /// Sets configuration for the child process's standard input (stdin) handle.
+ ///
+ /// Defaults to [`inherit`] when used with `spawn` or `status`, and
+ /// defaults to [`piped`] when used with `output`.
+ ///
+ /// [`inherit`]: std::process::Stdio::inherit
+ /// [`piped`]: std::process::Stdio::piped
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use std::process::{Stdio};
+ /// use tokio::process::Command;
+ ///
+ /// let command = Command::new("ls")
+ /// .stdin(Stdio::null());
+ /// ```
+ pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
+ self.std.stdin(cfg);
+ self
+ }
+
+ /// Sets configuration for the child process's standard output (stdout) handle.
+ ///
+ /// Defaults to [`inherit`] when used with `spawn` or `status`, and
+ /// defaults to [`piped`] when used with `output`.
+ ///
+ /// [`inherit`]: std::process::Stdio::inherit
+ /// [`piped`]: std::process::Stdio::piped
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ /// use std::process::Stdio;
+ ///
+ /// let command = Command::new("ls")
+ /// .stdout(Stdio::null());
+ /// ```
+ pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
+ self.std.stdout(cfg);
+ self
+ }
+
+ /// Sets configuration for the child process's standard error (stderr) handle.
+ ///
+ /// Defaults to [`inherit`] when used with `spawn` or `status`, and
+ /// defaults to [`piped`] when used with `output`.
+ ///
+ /// [`inherit`]: std::process::Stdio::inherit
+ /// [`piped`]: std::process::Stdio::piped
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ /// use std::process::{Stdio};
+ ///
+ /// let command = Command::new("ls")
+ /// .stderr(Stdio::null());
+ /// ```
+ pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command {
+ self.std.stderr(cfg);
+ self
+ }
+
+ /// Controls whether a `kill` operation should be invoked on a spawned child
+ /// process when its corresponding `Child` handle is dropped.
+ ///
+ /// By default, this value is assumed to be `false`, meaning the next spawned
+ /// process will not be killed on drop, similar to the behavior of the standard
+ /// library.
+ ///
+ /// # Caveats
+ ///
+ /// On Unix platforms processes must be "reaped" by their parent process after
+ /// they have exited in order to release all OS resources. A child process which
+ /// has exited, but has not yet been reaped by its parent is considered a "zombie"
+ /// process. Such processes continue to count against limits imposed by the system,
+ /// and having too many zombie processes present can prevent additional processes
+ /// from being spawned.
+ ///
+ /// Although issuing a `kill` signal to the child process is a synchronous
+ /// operation, the resulting zombie process cannot be `.await`ed inside of the
+ /// destructor to avoid blocking other tasks. The tokio runtime will, on a
+ /// best-effort basis, attempt to reap and clean up such processes in the
+ /// background, but makes no additional guarantees are made with regards
+ /// how quickly or how often this procedure will take place.
+ ///
+ /// If stronger guarantees are required, it is recommended to avoid dropping
+ /// a [`Child`] handle where possible, and instead utilize `child.wait().await`
+ /// or `child.kill().await` where possible.
+ pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command {
+ self.kill_on_drop = kill_on_drop;
+ self
+ }
+
+ /// Sets the [process creation flags][1] to be passed to `CreateProcess`.
+ ///
+ /// These will always be ORed with `CREATE_UNICODE_ENVIRONMENT`.
+ ///
+ /// [1]: https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863(v=vs.85).aspx
+ #[cfg(windows)]
+ #[cfg_attr(docsrs, doc(cfg(windows)))]
+ pub fn creation_flags(&mut self, flags: u32) -> &mut Command {
+ self.std.creation_flags(flags);
+ self
+ }
+
+ /// Sets the child process's user ID. This translates to a
+ /// `setuid` call in the child process. Failure in the `setuid`
+ /// call will cause the spawn to fail.
+ #[cfg(unix)]
+ #[cfg_attr(docsrs, doc(cfg(unix)))]
+ pub fn uid(&mut self, id: u32) -> &mut Command {
+ self.std.uid(id);
+ self
+ }
+
+ /// Similar to `uid` but sets the group ID of the child process. This has
+ /// the same semantics as the `uid` field.
+ #[cfg(unix)]
+ #[cfg_attr(docsrs, doc(cfg(unix)))]
+ pub fn gid(&mut self, id: u32) -> &mut Command {
+ self.std.gid(id);
+ self
+ }
+
+ /// Sets executable argument.
+ ///
+ /// Set the first process argument, `argv[0]`, to something other than the
+ /// default executable path.
+ #[cfg(unix)]
+ #[cfg_attr(docsrs, doc(cfg(unix)))]
+ pub fn arg0<S>(&mut self, arg: S) -> &mut Command
+ where
+ S: AsRef<OsStr>,
+ {
+ self.std.arg0(arg);
+ self
+ }
+
+ /// Schedules a closure to be run just before the `exec` function is
+ /// invoked.
+ ///
+ /// The closure is allowed to return an I/O error whose OS error code will
+ /// be communicated back to the parent and returned as an error from when
+ /// the spawn was requested.
+ ///
+ /// Multiple closures can be registered and they will be called in order of
+ /// their registration. If a closure returns `Err` then no further closures
+ /// will be called and the spawn operation will immediately return with a
+ /// failure.
+ ///
+ /// # Safety
+ ///
+ /// This closure will be run in the context of the child process after a
+ /// `fork`. This primarily means that any modifications made to memory on
+ /// behalf of this closure will **not** be visible to the parent process.
+ /// This is often a very constrained environment where normal operations
+ /// like `malloc` or acquiring a mutex are not guaranteed to work (due to
+ /// other threads perhaps still running when the `fork` was run).
+ ///
+ /// This also means that all resources such as file descriptors and
+ /// memory-mapped regions got duplicated. It is your responsibility to make
+ /// sure that the closure does not violate library invariants by making
+ /// invalid use of these duplicates.
+ ///
+ /// When this closure is run, aspects such as the stdio file descriptors and
+ /// working directory have successfully been changed, so output to these
+ /// locations may not appear where intended.
+ #[cfg(unix)]
+ #[cfg_attr(docsrs, doc(cfg(unix)))]
+ pub unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command
+ where
+ F: FnMut() -> io::Result<()> + Send + Sync + 'static,
+ {
+ self.std.pre_exec(f);
+ self
+ }
+
+ /// Executes the command as a child process, returning a handle to it.
+ ///
+ /// By default, stdin, stdout and stderr are inherited from the parent.
+ ///
+ /// This method will spawn the child process synchronously and return a
+ /// handle to a future-aware child process. The `Child` returned implements
+ /// `Future` itself to acquire the `ExitStatus` of the child, and otherwise
+ /// the `Child` has methods to acquire handles to the stdin, stdout, and
+ /// stderr streams.
+ ///
+ /// All I/O this child does will be associated with the current default
+ /// event loop.
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// async fn run_ls() -> std::process::ExitStatus {
+ /// Command::new("ls")
+ /// .spawn()
+ /// .expect("ls command failed to start")
+ /// .wait()
+ /// .await
+ /// .expect("ls command failed to run")
+ /// }
+ /// ```
+ ///
+ /// # Caveats
+ ///
+ /// ## Dropping/Cancellation
+ ///
+ /// Similar to the behavior to the standard library, and unlike the futures
+ /// paradigm of dropping-implies-cancellation, a spawned process will, by
+ /// default, continue to execute even after the `Child` handle has been dropped.
+ ///
+ /// The [`Command::kill_on_drop`] method can be used to modify this behavior
+ /// and kill the child process if the `Child` wrapper is dropped before it
+ /// has exited.
+ ///
+ /// ## Unix Processes
+ ///
+ /// On Unix platforms processes must be "reaped" by their parent process after
+ /// they have exited in order to release all OS resources. A child process which
+ /// has exited, but has not yet been reaped by its parent is considered a "zombie"
+ /// process. Such processes continue to count against limits imposed by the system,
+ /// and having too many zombie processes present can prevent additional processes
+ /// from being spawned.
+ ///
+ /// The tokio runtime will, on a best-effort basis, attempt to reap and clean up
+ /// any process which it has spawned. No additional guarantees are made with regards
+ /// how quickly or how often this procedure will take place.
+ ///
+ /// It is recommended to avoid dropping a [`Child`] process handle before it has been
+ /// fully `await`ed if stricter cleanup guarantees are required.
+ ///
+ /// [`Command`]: crate::process::Command
+ /// [`Command::kill_on_drop`]: crate::process::Command::kill_on_drop
+ /// [`Child`]: crate::process::Child
+ ///
+ /// # Errors
+ ///
+ /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock`
+ /// if the system process limit is reached (which includes other applications
+ /// running on the system).
+ pub fn spawn(&mut self) -> io::Result<Child> {
+ imp::spawn_child(&mut self.std).map(|spawned_child| Child {
+ child: FusedChild::Child(ChildDropGuard {
+ inner: spawned_child.child,
+ kill_on_drop: self.kill_on_drop,
+ }),
+ stdin: spawned_child.stdin.map(|inner| ChildStdin { inner }),
+ stdout: spawned_child.stdout.map(|inner| ChildStdout { inner }),
+ stderr: spawned_child.stderr.map(|inner| ChildStderr { inner }),
+ })
+ }
+
+ /// Executes the command as a child process, waiting for it to finish and
+ /// collecting its exit status.
+ ///
+ /// By default, stdin, stdout and stderr are inherited from the parent.
+ /// If any input/output handles are set to a pipe then they will be immediately
+ /// closed after the child is spawned.
+ ///
+ /// All I/O this child does will be associated with the current default
+ /// event loop.
+ ///
+ /// The destructor of the future returned by this function will kill
+ /// the child if [`kill_on_drop`] is set to true.
+ ///
+ /// [`kill_on_drop`]: fn@Self::kill_on_drop
+ ///
+ /// # Errors
+ ///
+ /// This future will return an error if the child process cannot be spawned
+ /// or if there is an error while awaiting its status.
+ ///
+ /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock`
+ /// if the system process limit is reached (which includes other applications
+ /// running on the system).
+ ///
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// async fn run_ls() -> std::process::ExitStatus {
+ /// Command::new("ls")
+ /// .status()
+ /// .await
+ /// .expect("ls command failed to run")
+ /// }
+ /// ```
+ pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> {
+ let child = self.spawn();
+
+ async {
+ let mut child = child?;
+
+ // Ensure we close any stdio handles so we can't deadlock
+ // waiting on the child which may be waiting to read/write
+ // to a pipe we're holding.
+ child.stdin.take();
+ child.stdout.take();
+ child.stderr.take();
+
+ child.wait().await
+ }
+ }
+
+ /// Executes the command as a child process, waiting for it to finish and
+ /// collecting all of its output.
+ ///
+ /// > **Note**: this method, unlike the standard library, will
+ /// > unconditionally configure the stdout/stderr handles to be pipes, even
+ /// > if they have been previously configured. If this is not desired then
+ /// > the `spawn` method should be used in combination with the
+ /// > `wait_with_output` method on child.
+ ///
+ /// This method will return a future representing the collection of the
+ /// child process's stdout/stderr. It will resolve to
+ /// the `Output` type in the standard library, containing `stdout` and
+ /// `stderr` as `Vec<u8>` along with an `ExitStatus` representing how the
+ /// process exited.
+ ///
+ /// All I/O this child does will be associated with the current default
+ /// event loop.
+ ///
+ /// The destructor of the future returned by this function will kill
+ /// the child if [`kill_on_drop`] is set to true.
+ ///
+ /// [`kill_on_drop`]: fn@Self::kill_on_drop
+ ///
+ /// # Errors
+ ///
+ /// This future will return an error if the child process cannot be spawned
+ /// or if there is an error while awaiting its status.
+ ///
+ /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock`
+ /// if the system process limit is reached (which includes other applications
+ /// running on the system).
+ /// # Examples
+ ///
+ /// Basic usage:
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ ///
+ /// async fn run_ls() {
+ /// let output: std::process::Output = Command::new("ls")
+ /// .output()
+ /// .await
+ /// .expect("ls command failed to run");
+ /// println!("stderr of ls: {:?}", output.stderr);
+ /// }
+ /// ```
+ pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> {
+ self.std.stdout(Stdio::piped());
+ self.std.stderr(Stdio::piped());
+
+ let child = self.spawn();
+
+ async { child?.wait_with_output().await }
+ }
+}
+
+impl From<StdCommand> for Command {
+ fn from(std: StdCommand) -> Command {
+ Command {
+ std,
+ kill_on_drop: false,
+ }
+ }
+}
+
+/// A drop guard which can ensure the child process is killed on drop if specified.
+#[derive(Debug)]
+struct ChildDropGuard<T: Kill> {
+ inner: T,
+ kill_on_drop: bool,
+}
+
+impl<T: Kill> Kill for ChildDropGuard<T> {
+ fn kill(&mut self) -> io::Result<()> {
+ let ret = self.inner.kill();
+
+ if ret.is_ok() {
+ self.kill_on_drop = false;
+ }
+
+ ret
+ }
+}
+
+impl<T: Kill> Drop for ChildDropGuard<T> {
+ fn drop(&mut self) {
+ if self.kill_on_drop {
+ drop(self.kill());
+ }
+ }
+}
+
+impl<T, E, F> Future for ChildDropGuard<F>
+where
+ F: Future<Output = Result<T, E>> + Kill + Unpin,
+{
+ type Output = Result<T, E>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+
+ let ret = Pin::new(&mut self.inner).poll(cx);
+
+ if let Poll::Ready(Ok(_)) = ret {
+ // Avoid the overhead of trying to kill a reaped process
+ self.kill_on_drop = false;
+ }
+
+ if ret.is_ready() {
+ coop.made_progress();
+ }
+
+ ret
+ }
+}
+
+/// Keeps track of the exit status of a child process without worrying about
+/// polling the underlying futures even after they have completed.
+#[derive(Debug)]
+enum FusedChild {
+ Child(ChildDropGuard<imp::Child>),
+ Done(ExitStatus),
+}
+
+/// Representation of a child process spawned onto an event loop.
+///
+/// # Caveats
+/// Similar to the behavior to the standard library, and unlike the futures
+/// paradigm of dropping-implies-cancellation, a spawned process will, by
+/// default, continue to execute even after the `Child` handle has been dropped.
+///
+/// The `Command::kill_on_drop` method can be used to modify this behavior
+/// and kill the child process if the `Child` wrapper is dropped before it
+/// has exited.
+#[derive(Debug)]
+pub struct Child {
+ child: FusedChild,
+
+ /// The handle for writing to the child's standard input (stdin), if it has
+ /// been captured. To avoid partially moving the `child` and thus blocking
+ /// yourself from calling functions on `child` while using `stdin`, you might
+ /// find it helpful to do:
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stdin = child.stdin.take().unwrap();
+ /// ```
+ pub stdin: Option<ChildStdin>,
+
+ /// The handle for reading from the child's standard output (stdout), if it
+ /// has been captured. You might find it helpful to do
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stdout = child.stdout.take().unwrap();
+ /// ```
+ ///
+ /// to avoid partially moving the `child` and thus blocking yourself from calling
+ /// functions on `child` while using `stdout`.
+ pub stdout: Option<ChildStdout>,
+
+ /// The handle for reading from the child's standard error (stderr), if it
+ /// has been captured. You might find it helpful to do
+ ///
+ /// ```no_run
+ /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap();
+ /// let stderr = child.stderr.take().unwrap();
+ /// ```
+ ///
+ /// to avoid partially moving the `child` and thus blocking yourself from calling
+ /// functions on `child` while using `stderr`.
+ pub stderr: Option<ChildStderr>,
+}
+
+impl Child {
+ /// Returns the OS-assigned process identifier associated with this child
+ /// while it is still running.
+ ///
+ /// Once the child has been polled to completion this will return `None`.
+ /// This is done to avoid confusion on platforms like Unix where the OS
+ /// identifier could be reused once the process has completed.
+ pub fn id(&self) -> Option<u32> {
+ match &self.child {
+ FusedChild::Child(child) => Some(child.inner.id()),
+ FusedChild::Done(_) => None,
+ }
+ }
+
+ /// Extracts the raw handle of the process associated with this child while
+ /// it is still running. Returns `None` if the child has exited.
+ #[cfg(windows)]
+ pub fn raw_handle(&self) -> Option<RawHandle> {
+ match &self.child {
+ FusedChild::Child(c) => Some(c.inner.as_raw_handle()),
+ FusedChild::Done(_) => None,
+ }
+ }
+
+ /// Attempts to force the child to exit, but does not wait for the request
+ /// to take effect.
+ ///
+ /// On Unix platforms, this is the equivalent to sending a SIGKILL. Note
+ /// that on Unix platforms it is possible for a zombie process to remain
+ /// after a kill is sent; to avoid this, the caller should ensure that either
+ /// `child.wait().await` or `child.try_wait()` is invoked successfully.
+ pub fn start_kill(&mut self) -> io::Result<()> {
+ match &mut self.child {
+ FusedChild::Child(child) => child.kill(),
+ FusedChild::Done(_) => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "invalid argument: can't kill an exited process",
+ )),
+ }
+ }
+
+ /// Forces the child to exit.
+ ///
+ /// This is equivalent to sending a SIGKILL on unix platforms.
+ ///
+ /// If the child has to be killed remotely, it is possible to do it using
+ /// a combination of the select! macro and a oneshot channel. In the following
+ /// example, the child will run until completion unless a message is sent on
+ /// the oneshot channel. If that happens, the child is killed immediately
+ /// using the `.kill()` method.
+ ///
+ /// ```no_run
+ /// use tokio::process::Command;
+ /// use tokio::sync::oneshot::channel;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let (send, recv) = channel::<()>();
+ /// let mut child = Command::new("sleep").arg("1").spawn().unwrap();
+ /// tokio::spawn(async move { send.send(()) });
+ /// tokio::select! {
+ /// _ = child.wait() => {}
+ /// _ = recv => child.kill().await.expect("kill failed"),
+ /// }
+ /// }
+ /// ```
+ pub async fn kill(&mut self) -> io::Result<()> {
+ self.start_kill()?;
+ self.wait().await?;
+ Ok(())
+ }
+
+ /// Waits for the child to exit completely, returning the status that it
+ /// exited with. This function will continue to have the same return value
+ /// after it has been called at least once.
+ ///
+ /// The stdin handle to the child process, if any, will be closed
+ /// before waiting. This helps avoid deadlock: it ensures that the
+ /// child does not block waiting for input from the parent, while
+ /// the parent waits for the child to exit.
+ ///
+ /// If the caller wishes to explicitly control when the child's stdin
+ /// handle is closed, they may `.take()` it before calling `.wait()`:
+ ///
+ /// ```
+ /// # #[cfg(not(unix))]fn main(){}
+ /// # #[cfg(unix)]
+ /// use tokio::io::AsyncWriteExt;
+ /// # #[cfg(unix)]
+ /// use tokio::process::Command;
+ /// # #[cfg(unix)]
+ /// use std::process::Stdio;
+ ///
+ /// # #[cfg(unix)]
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let mut child = Command::new("cat")
+ /// .stdin(Stdio::piped())
+ /// .spawn()
+ /// .unwrap();
+ ///
+ /// let mut stdin = child.stdin.take().unwrap();
+ /// tokio::spawn(async move {
+ /// // do something with stdin here...
+ /// stdin.write_all(b"hello world\n").await.unwrap();
+ ///
+ /// // then drop when finished
+ /// drop(stdin);
+ /// });
+ ///
+ /// // wait for the process to complete
+ /// let _ = child.wait().await;
+ /// }
+ /// ```
+ pub async fn wait(&mut self) -> io::Result<ExitStatus> {
+ // Ensure stdin is closed so the child isn't stuck waiting on
+ // input while the parent is waiting for it to exit.
+ drop(self.stdin.take());
+
+ match &mut self.child {
+ FusedChild::Done(exit) => Ok(*exit),
+ FusedChild::Child(child) => {
+ let ret = child.await;
+
+ if let Ok(exit) = ret {
+ self.child = FusedChild::Done(exit);
+ }
+
+ ret
+ }
+ }
+ }
+
+ /// Attempts to collect the exit status of the child if it has already
+ /// exited.
+ ///
+ /// This function will not block the calling thread and will only
+ /// check to see if the child process has exited or not. If the child has
+ /// exited then on Unix the process ID is reaped. This function is
+ /// guaranteed to repeatedly return a successful exit status so long as the
+ /// child has already exited.
+ ///
+ /// If the child has exited, then `Ok(Some(status))` is returned. If the
+ /// exit status is not available at this time then `Ok(None)` is returned.
+ /// If an error occurs, then that error is returned.
+ ///
+ /// Note that unlike `wait`, this function will not attempt to drop stdin,
+ /// nor will it wake the current task if the child exits.
+ pub fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ match &mut self.child {
+ FusedChild::Done(exit) => Ok(Some(*exit)),
+ FusedChild::Child(guard) => {
+ let ret = guard.inner.try_wait();
+
+ if let Ok(Some(exit)) = ret {
+ // Avoid the overhead of trying to kill a reaped process
+ guard.kill_on_drop = false;
+ self.child = FusedChild::Done(exit);
+ }
+
+ ret
+ }
+ }
+ }
+
+ /// Returns a future that will resolve to an `Output`, containing the exit
+ /// status, stdout, and stderr of the child process.
+ ///
+ /// The returned future will simultaneously waits for the child to exit and
+ /// collect all remaining output on the stdout/stderr handles, returning an
+ /// `Output` instance.
+ ///
+ /// The stdin handle to the child process, if any, will be closed before
+ /// waiting. This helps avoid deadlock: it ensures that the child does not
+ /// block waiting for input from the parent, while the parent waits for the
+ /// child to exit.
+ ///
+ /// By default, stdin, stdout and stderr are inherited from the parent. In
+ /// order to capture the output into this `Output` it is necessary to create
+ /// new pipes between parent and child. Use `stdout(Stdio::piped())` or
+ /// `stderr(Stdio::piped())`, respectively, when creating a `Command`.
+ pub async fn wait_with_output(mut self) -> io::Result<Output> {
+ use crate::future::try_join3;
+
+ async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> io::Result<Vec<u8>> {
+ let mut vec = Vec::new();
+ if let Some(io) = io.as_mut() {
+ crate::io::util::read_to_end(io, &mut vec).await?;
+ }
+ Ok(vec)
+ }
+
+ let mut stdout_pipe = self.stdout.take();
+ let mut stderr_pipe = self.stderr.take();
+
+ let stdout_fut = read_to_end(&mut stdout_pipe);
+ let stderr_fut = read_to_end(&mut stderr_pipe);
+
+ let (status, stdout, stderr) = try_join3(self.wait(), stdout_fut, stderr_fut).await?;
+
+ // Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
+ drop(stdout_pipe);
+ drop(stderr_pipe);
+
+ Ok(Output {
+ status,
+ stdout,
+ stderr,
+ })
+ }
+}
+
+/// The standard input stream for spawned children.
+///
+/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of
+/// handle of a child process asynchronously.
+#[derive(Debug)]
+pub struct ChildStdin {
+ inner: imp::ChildStdio,
+}
+
+/// The standard output stream for spawned children.
+///
+/// This type implements the `AsyncRead` trait to read data from the stdout
+/// handle of a child process asynchronously.
+#[derive(Debug)]
+pub struct ChildStdout {
+ inner: imp::ChildStdio,
+}
+
+/// The standard error stream for spawned children.
+///
+/// This type implements the `AsyncRead` trait to read data from the stderr
+/// handle of a child process asynchronously.
+#[derive(Debug)]
+pub struct ChildStderr {
+ inner: imp::ChildStdio,
+}
+
+impl ChildStdin {
+ /// Creates an asynchronous `ChildStdin` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStdin) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
+}
+
+impl ChildStdout {
+ /// Creates an asynchronous `ChildStderr` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStdout) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
+}
+
+impl ChildStderr {
+ /// Creates an asynchronous `ChildStderr` from a synchronous one.
+ ///
+ /// # Errors
+ ///
+ /// This method may fail if an error is encountered when setting the pipe to
+ /// non-blocking mode, or when registering the pipe with the runtime's IO
+ /// driver.
+ pub fn from_std(inner: std::process::ChildStderr) -> io::Result<Self> {
+ Ok(Self {
+ inner: imp::stdio(inner)?,
+ })
+ }
+}
+
+impl AsyncWrite for ChildStdin {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.inner.poll_write(cx, buf)
+ }
+
+ fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl AsyncRead for ChildStdout {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: pipes support reading into uninitialized memory
+ unsafe { self.inner.poll_read(cx, buf) }
+ }
+}
+
+impl AsyncRead for ChildStderr {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll<io::Result<()>> {
+ // Safety: pipes support reading into uninitialized memory
+ unsafe { self.inner.poll_read(cx, buf) }
+ }
+}
+
+impl TryInto<Stdio> for ChildStdin {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
+impl TryInto<Stdio> for ChildStdout {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
+impl TryInto<Stdio> for ChildStderr {
+ type Error = io::Error;
+
+ fn try_into(self) -> Result<Stdio, Self::Error> {
+ imp::convert_to_stdio(self.inner)
+ }
+}
+
+#[cfg(unix)]
+mod sys {
+ use std::os::unix::io::{AsRawFd, RawFd};
+
+ use super::{ChildStderr, ChildStdin, ChildStdout};
+
+ impl AsRawFd for ChildStdin {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+ }
+
+ impl AsRawFd for ChildStdout {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+ }
+
+ impl AsRawFd for ChildStderr {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.as_raw_fd()
+ }
+ }
+}
+
+#[cfg(windows)]
+mod sys {
+ use std::os::windows::io::{AsRawHandle, RawHandle};
+
+ use super::{ChildStderr, ChildStdin, ChildStdout};
+
+ impl AsRawHandle for ChildStdin {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.inner.as_raw_handle()
+ }
+ }
+
+ impl AsRawHandle for ChildStdout {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.inner.as_raw_handle()
+ }
+ }
+
+ impl AsRawHandle for ChildStderr {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.inner.as_raw_handle()
+ }
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::kill::Kill;
+ use super::ChildDropGuard;
+
+ use futures::future::FutureExt;
+ use std::future::Future;
+ use std::io;
+ use std::pin::Pin;
+ use std::task::{Context, Poll};
+
+ struct Mock {
+ num_kills: usize,
+ num_polls: usize,
+ poll_result: Poll<Result<(), ()>>,
+ }
+
+ impl Mock {
+ fn new() -> Self {
+ Self::with_result(Poll::Pending)
+ }
+
+ fn with_result(result: Poll<Result<(), ()>>) -> Self {
+ Self {
+ num_kills: 0,
+ num_polls: 0,
+ poll_result: result,
+ }
+ }
+ }
+
+ impl Kill for Mock {
+ fn kill(&mut self) -> io::Result<()> {
+ self.num_kills += 1;
+ Ok(())
+ }
+ }
+
+ impl Future for Mock {
+ type Output = Result<(), ()>;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = Pin::get_mut(self);
+ inner.num_polls += 1;
+ inner.poll_result
+ }
+ }
+
+ #[test]
+ fn kills_on_drop_if_specified() {
+ let mut mock = Mock::new();
+
+ {
+ let guard = ChildDropGuard {
+ inner: &mut mock,
+ kill_on_drop: true,
+ };
+ drop(guard);
+ }
+
+ assert_eq!(1, mock.num_kills);
+ assert_eq!(0, mock.num_polls);
+ }
+
+ #[test]
+ fn no_kill_on_drop_by_default() {
+ let mut mock = Mock::new();
+
+ {
+ let guard = ChildDropGuard {
+ inner: &mut mock,
+ kill_on_drop: false,
+ };
+ drop(guard);
+ }
+
+ assert_eq!(0, mock.num_kills);
+ assert_eq!(0, mock.num_polls);
+ }
+
+ #[test]
+ fn no_kill_if_already_killed() {
+ let mut mock = Mock::new();
+
+ {
+ let mut guard = ChildDropGuard {
+ inner: &mut mock,
+ kill_on_drop: true,
+ };
+ let _ = guard.kill();
+ drop(guard);
+ }
+
+ assert_eq!(1, mock.num_kills);
+ assert_eq!(0, mock.num_polls);
+ }
+
+ #[test]
+ fn no_kill_if_reaped() {
+ let mut mock_pending = Mock::with_result(Poll::Pending);
+ let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(())));
+ let mut mock_err = Mock::with_result(Poll::Ready(Err(())));
+
+ let waker = futures::task::noop_waker();
+ let mut context = Context::from_waker(&waker);
+ {
+ let mut guard = ChildDropGuard {
+ inner: &mut mock_pending,
+ kill_on_drop: true,
+ };
+ let _ = guard.poll_unpin(&mut context);
+
+ let mut guard = ChildDropGuard {
+ inner: &mut mock_reaped,
+ kill_on_drop: true,
+ };
+ let _ = guard.poll_unpin(&mut context);
+
+ let mut guard = ChildDropGuard {
+ inner: &mut mock_err,
+ kill_on_drop: true,
+ };
+ let _ = guard.poll_unpin(&mut context);
+ }
+
+ assert_eq!(1, mock_pending.num_kills);
+ assert_eq!(1, mock_pending.num_polls);
+
+ assert_eq!(0, mock_reaped.num_kills);
+ assert_eq!(1, mock_reaped.num_polls);
+
+ assert_eq!(1, mock_err.num_kills);
+ assert_eq!(1, mock_err.num_polls);
+ }
+}
diff --git a/third_party/rust/tokio/src/process/unix/driver.rs b/third_party/rust/tokio/src/process/unix/driver.rs
new file mode 100644
index 0000000000..84dc8fbd02
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/driver.rs
@@ -0,0 +1,58 @@
+#![cfg_attr(not(feature = "rt"), allow(dead_code))]
+
+//! Process driver.
+
+use crate::park::Park;
+use crate::process::unix::GlobalOrphanQueue;
+use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+
+use std::io;
+use std::time::Duration;
+
+/// Responsible for cleaning up orphaned child processes on Unix platforms.
+#[derive(Debug)]
+pub(crate) struct Driver {
+ park: SignalDriver,
+ signal_handle: SignalHandle,
+}
+
+// ===== impl Driver =====
+
+impl Driver {
+ /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
+ pub(crate) fn new(park: SignalDriver) -> Self {
+ let signal_handle = park.handle();
+
+ Self {
+ park,
+ signal_handle,
+ }
+ }
+}
+
+// ===== impl Park for Driver =====
+
+impl Park for Driver {
+ type Unpark = <SignalDriver as Park>::Unpark;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.park.unpark()
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.park.park()?;
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ self.park.park_timeout(duration)?;
+ GlobalOrphanQueue::reap_orphans(&self.signal_handle);
+ Ok(())
+ }
+
+ fn shutdown(&mut self) {
+ self.park.shutdown()
+ }
+}
diff --git a/third_party/rust/tokio/src/process/unix/mod.rs b/third_party/rust/tokio/src/process/unix/mod.rs
new file mode 100644
index 0000000000..576fe6cb47
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/mod.rs
@@ -0,0 +1,250 @@
+//! Unix handling of child processes.
+//!
+//! Right now the only "fancy" thing about this is how we implement the
+//! `Future` implementation on `Child` to get the exit status. Unix offers
+//! no way to register a child with epoll, and the only real way to get a
+//! notification when a process exits is the SIGCHLD signal.
+//!
+//! Signal handling in general is *super* hairy and complicated, and it's even
+//! more complicated here with the fact that signals are coalesced, so we may
+//! not get a SIGCHLD-per-child.
+//!
+//! Our best approximation here is to check *all spawned processes* for all
+//! SIGCHLD signals received. To do that we create a `Signal`, implemented in
+//! the `tokio-net` crate, which is a stream over signals being received.
+//!
+//! Later when we poll the process's exit status we simply check to see if a
+//! SIGCHLD has happened since we last checked, and while that returns "yes" we
+//! keep trying.
+//!
+//! Note that this means that this isn't really scalable, but then again
+//! processes in general aren't scalable (e.g. millions) so it shouldn't be that
+//! bad in theory...
+
+pub(crate) mod driver;
+
+pub(crate) mod orphan;
+use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
+
+mod reap;
+use reap::Reaper;
+
+use crate::io::PollEvented;
+use crate::process::kill::Kill;
+use crate::process::SpawnedChild;
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal, Signal, SignalKind};
+
+use mio::event::Source;
+use mio::unix::SourceFd;
+use once_cell::sync::Lazy;
+use std::fmt;
+use std::fs::File;
+use std::future::Future;
+use std::io;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
+use std::pin::Pin;
+use std::process::{Child as StdChild, ExitStatus, Stdio};
+use std::task::Context;
+use std::task::Poll;
+
+impl Wait for StdChild {
+ fn id(&self) -> u32 {
+ self.id()
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ self.try_wait()
+ }
+}
+
+impl Kill for StdChild {
+ fn kill(&mut self) -> io::Result<()> {
+ self.kill()
+ }
+}
+
+static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new);
+
+pub(crate) struct GlobalOrphanQueue;
+
+impl fmt::Debug for GlobalOrphanQueue {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ ORPHAN_QUEUE.fmt(fmt)
+ }
+}
+
+impl GlobalOrphanQueue {
+ fn reap_orphans(handle: &SignalHandle) {
+ ORPHAN_QUEUE.reap_orphans(handle)
+ }
+}
+
+impl OrphanQueue<StdChild> for GlobalOrphanQueue {
+ fn push_orphan(&self, orphan: StdChild) {
+ ORPHAN_QUEUE.push_orphan(orphan)
+ }
+}
+
+#[must_use = "futures do nothing unless polled"]
+pub(crate) struct Child {
+ inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
+}
+
+impl fmt::Debug for Child {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Child")
+ .field("pid", &self.inner.id())
+ .finish()
+ }
+}
+
+pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
+ let mut child = cmd.spawn()?;
+ let stdin = child.stdin.take().map(stdio).transpose()?;
+ let stdout = child.stdout.take().map(stdio).transpose()?;
+ let stderr = child.stderr.take().map(stdio).transpose()?;
+
+ let signal = signal(SignalKind::child())?;
+
+ Ok(SpawnedChild {
+ child: Child {
+ inner: Reaper::new(child, GlobalOrphanQueue, signal),
+ },
+ stdin,
+ stdout,
+ stderr,
+ })
+}
+
+impl Child {
+ pub(crate) fn id(&self) -> u32 {
+ self.inner.id()
+ }
+
+ pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ self.inner.inner_mut().try_wait()
+ }
+}
+
+impl Kill for Child {
+ fn kill(&mut self) -> io::Result<()> {
+ self.inner.kill()
+ }
+}
+
+impl Future for Child {
+ type Output = io::Result<ExitStatus>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ Pin::new(&mut self.inner).poll(cx)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct Pipe {
+ // Actually a pipe and not a File. However, we are reusing `File` to get
+ // close on drop. This is a similar trick as `mio`.
+ fd: File,
+}
+
+impl<T: IntoRawFd> From<T> for Pipe {
+ fn from(fd: T) -> Self {
+ let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
+ Self { fd }
+ }
+}
+
+impl<'a> io::Read for &'a Pipe {
+ fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
+ (&self.fd).read(bytes)
+ }
+}
+
+impl<'a> io::Write for &'a Pipe {
+ fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
+ (&self.fd).write(bytes)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ (&self.fd).flush()
+ }
+}
+
+impl AsRawFd for Pipe {
+ fn as_raw_fd(&self) -> RawFd {
+ self.fd.as_raw_fd()
+ }
+}
+
+pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> {
+ let mut fd = io.into_inner()?.fd;
+
+ // Ensure that the fd to be inherited is set to *blocking* mode, as this
+ // is the default that virtually all programs expect to have. Those
+ // programs that know how to work with nonblocking stdio will know how to
+ // change it to nonblocking mode.
+ set_nonblocking(&mut fd, false)?;
+
+ Ok(Stdio::from(fd))
+}
+
+impl Source for Pipe {
+ fn register(
+ &mut self,
+ registry: &mio::Registry,
+ token: mio::Token,
+ interest: mio::Interest,
+ ) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).register(registry, token, interest)
+ }
+
+ fn reregister(
+ &mut self,
+ registry: &mio::Registry,
+ token: mio::Token,
+ interest: mio::Interest,
+ ) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
+ }
+
+ fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
+ SourceFd(&self.as_raw_fd()).deregister(registry)
+ }
+}
+
+pub(crate) type ChildStdio = PollEvented<Pipe>;
+
+fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
+ unsafe {
+ let fd = fd.as_raw_fd();
+ let previous = libc::fcntl(fd, libc::F_GETFL);
+ if previous == -1 {
+ return Err(io::Error::last_os_error());
+ }
+
+ let new = if nonblocking {
+ previous | libc::O_NONBLOCK
+ } else {
+ previous & !libc::O_NONBLOCK
+ };
+
+ let r = libc::fcntl(fd, libc::F_SETFL, new);
+ if r == -1 {
+ return Err(io::Error::last_os_error());
+ }
+ }
+
+ Ok(())
+}
+
+pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>>
+where
+ T: IntoRawFd,
+{
+ // Set the fd to nonblocking before we pass it to the event loop
+ let mut pipe = Pipe::from(io);
+ set_nonblocking(&mut pipe, true)?;
+
+ PollEvented::new(pipe)
+}
diff --git a/third_party/rust/tokio/src/process/unix/orphan.rs b/third_party/rust/tokio/src/process/unix/orphan.rs
new file mode 100644
index 0000000000..0e52530c37
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/orphan.rs
@@ -0,0 +1,321 @@
+use crate::loom::sync::{Mutex, MutexGuard};
+use crate::signal::unix::driver::Handle as SignalHandle;
+use crate::signal::unix::{signal_with_handle, SignalKind};
+use crate::sync::watch;
+use std::io;
+use std::process::ExitStatus;
+
+/// An interface for waiting on a process to exit.
+pub(crate) trait Wait {
+ /// Get the identifier for this process or diagnostics.
+ fn id(&self) -> u32;
+ /// Try waiting for a process to exit in a non-blocking manner.
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
+}
+
+impl<T: Wait> Wait for &mut T {
+ fn id(&self) -> u32 {
+ (**self).id()
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ (**self).try_wait()
+ }
+}
+
+/// An interface for queueing up an orphaned process so that it can be reaped.
+pub(crate) trait OrphanQueue<T> {
+ /// Adds an orphan to the queue.
+ fn push_orphan(&self, orphan: T);
+}
+
+impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
+ fn push_orphan(&self, orphan: T) {
+ (**self).push_orphan(orphan);
+ }
+}
+
+/// An implementation of `OrphanQueue`.
+#[derive(Debug)]
+pub(crate) struct OrphanQueueImpl<T> {
+ sigchild: Mutex<Option<watch::Receiver<()>>>,
+ queue: Mutex<Vec<T>>,
+}
+
+impl<T> OrphanQueueImpl<T> {
+ pub(crate) fn new() -> Self {
+ Self {
+ sigchild: Mutex::new(None),
+ queue: Mutex::new(Vec::new()),
+ }
+ }
+
+ #[cfg(test)]
+ fn len(&self) -> usize {
+ self.queue.lock().len()
+ }
+
+ pub(crate) fn push_orphan(&self, orphan: T)
+ where
+ T: Wait,
+ {
+ self.queue.lock().push(orphan)
+ }
+
+ /// Attempts to reap every process in the queue, ignoring any errors and
+ /// enqueueing any orphans which have not yet exited.
+ pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
+ where
+ T: Wait,
+ {
+ // If someone else is holding the lock, they will be responsible for draining
+ // the queue as necessary, so we can safely bail if that happens
+ if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
+ match &mut *sigchild_guard {
+ Some(sigchild) => {
+ if sigchild.try_has_changed().and_then(Result::ok).is_some() {
+ drain_orphan_queue(self.queue.lock());
+ }
+ }
+ None => {
+ let queue = self.queue.lock();
+
+ // Be lazy and only initialize the SIGCHLD listener if there
+ // are any orphaned processes in the queue.
+ if !queue.is_empty() {
+ // An errors shouldn't really happen here, but if it does it
+ // means that the signal driver isn't running, in
+ // which case there isn't anything we can
+ // register/initialize here, so we can try again later
+ if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
+ *sigchild_guard = Some(sigchild);
+ drain_orphan_queue(queue);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
+where
+ T: Wait,
+{
+ for i in (0..queue.len()).rev() {
+ match queue[i].try_wait() {
+ Ok(None) => {}
+ Ok(Some(_)) | Err(_) => {
+ // The stdlib handles interruption errors (EINTR) when polling a child process.
+ // All other errors represent invalid inputs or pids that have already been
+ // reaped, so we can drop the orphan in case an error is raised.
+ queue.swap_remove(i);
+ }
+ }
+ }
+
+ drop(queue);
+}
+
+#[cfg(all(test, not(loom)))]
+pub(crate) mod test {
+ use super::*;
+ use crate::io::driver::Driver as IoDriver;
+ use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
+ use crate::sync::watch;
+ use std::cell::{Cell, RefCell};
+ use std::io;
+ use std::os::unix::process::ExitStatusExt;
+ use std::process::ExitStatus;
+ use std::rc::Rc;
+
+ pub(crate) struct MockQueue<W> {
+ pub(crate) all_enqueued: RefCell<Vec<W>>,
+ }
+
+ impl<W> MockQueue<W> {
+ pub(crate) fn new() -> Self {
+ Self {
+ all_enqueued: RefCell::new(Vec::new()),
+ }
+ }
+ }
+
+ impl<W> OrphanQueue<W> for MockQueue<W> {
+ fn push_orphan(&self, orphan: W) {
+ self.all_enqueued.borrow_mut().push(orphan);
+ }
+ }
+
+ struct MockWait {
+ total_waits: Rc<Cell<usize>>,
+ num_wait_until_status: usize,
+ return_err: bool,
+ }
+
+ impl MockWait {
+ fn new(num_wait_until_status: usize) -> Self {
+ Self {
+ total_waits: Rc::new(Cell::new(0)),
+ num_wait_until_status,
+ return_err: false,
+ }
+ }
+
+ fn with_err() -> Self {
+ Self {
+ total_waits: Rc::new(Cell::new(0)),
+ num_wait_until_status: 0,
+ return_err: true,
+ }
+ }
+ }
+
+ impl Wait for MockWait {
+ fn id(&self) -> u32 {
+ 42
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ let waits = self.total_waits.get();
+
+ let ret = if self.num_wait_until_status == waits {
+ if self.return_err {
+ Ok(Some(ExitStatus::from_raw(0)))
+ } else {
+ Err(io::Error::new(io::ErrorKind::Other, "mock err"))
+ }
+ } else {
+ Ok(None)
+ };
+
+ self.total_waits.set(waits + 1);
+ ret
+ }
+ }
+
+ #[test]
+ fn drain_attempts_a_single_reap_of_all_queued_orphans() {
+ let first_orphan = MockWait::new(0);
+ let second_orphan = MockWait::new(1);
+ let third_orphan = MockWait::new(2);
+ let fourth_orphan = MockWait::with_err();
+
+ let first_waits = first_orphan.total_waits.clone();
+ let second_waits = second_orphan.total_waits.clone();
+ let third_waits = third_orphan.total_waits.clone();
+ let fourth_waits = fourth_orphan.total_waits.clone();
+
+ let orphanage = OrphanQueueImpl::new();
+ orphanage.push_orphan(first_orphan);
+ orphanage.push_orphan(third_orphan);
+ orphanage.push_orphan(second_orphan);
+ orphanage.push_orphan(fourth_orphan);
+
+ assert_eq!(orphanage.len(), 4);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 2);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 1);
+ assert_eq!(third_waits.get(), 1);
+ assert_eq!(fourth_waits.get(), 1);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 1);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 2);
+ assert_eq!(third_waits.get(), 2);
+ assert_eq!(fourth_waits.get(), 1);
+
+ drain_orphan_queue(orphanage.queue.lock());
+ assert_eq!(orphanage.len(), 0);
+ assert_eq!(first_waits.get(), 1);
+ assert_eq!(second_waits.get(), 2);
+ assert_eq!(third_waits.get(), 3);
+ assert_eq!(fourth_waits.get(), 1);
+
+ // Safe to reap when empty
+ drain_orphan_queue(orphanage.queue.lock());
+ }
+
+ #[test]
+ fn no_reap_if_no_signal_received() {
+ let (tx, rx) = watch::channel(());
+
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ *orphanage.sigchild.lock() = Some(rx);
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ tx.send(()).unwrap();
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 1);
+ }
+
+ #[test]
+ fn no_reap_if_signal_lock_held() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ let signal_guard = orphanage.sigchild.lock();
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert_eq!(waits.get(), 0);
+
+ drop(signal_guard);
+ }
+
+ #[cfg_attr(miri, ignore)] // Miri does not support epoll.
+ #[test]
+ fn does_not_register_signal_if_queue_empty() {
+ let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
+ let handle = signal_driver.handle();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none()); // Sanity
+
+ // No register when queue empty
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_some());
+ assert_eq!(waits.get(), 1); // Eager reap when registering listener
+ }
+
+ #[test]
+ fn does_nothing_if_signal_could_not_be_registered() {
+ let handle = SignalHandle::default();
+
+ let orphanage = OrphanQueueImpl::new();
+ assert!(orphanage.sigchild.lock().is_none());
+
+ let orphan = MockWait::new(2);
+ let waits = orphan.total_waits.clone();
+ orphanage.push_orphan(orphan);
+
+ // Signal handler has "gone away", nothing to register or reap
+ orphanage.reap_orphans(&handle);
+ assert!(orphanage.sigchild.lock().is_none());
+ assert_eq!(waits.get(), 0);
+ }
+}
diff --git a/third_party/rust/tokio/src/process/unix/reap.rs b/third_party/rust/tokio/src/process/unix/reap.rs
new file mode 100644
index 0000000000..f7f4d3cc70
--- /dev/null
+++ b/third_party/rust/tokio/src/process/unix/reap.rs
@@ -0,0 +1,298 @@
+use crate::process::imp::orphan::{OrphanQueue, Wait};
+use crate::process::kill::Kill;
+use crate::signal::unix::InternalStream;
+
+use std::future::Future;
+use std::io;
+use std::ops::Deref;
+use std::pin::Pin;
+use std::process::ExitStatus;
+use std::task::Context;
+use std::task::Poll;
+
+/// Orchestrates between registering interest for receiving signals when a
+/// child process has exited, and attempting to poll for process completion.
+#[derive(Debug)]
+pub(crate) struct Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ inner: Option<W>,
+ orphan_queue: Q,
+ signal: S,
+}
+
+impl<W, Q, S> Deref for Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ type Target = W;
+
+ fn deref(&self) -> &Self::Target {
+ self.inner()
+ }
+}
+
+impl<W, Q, S> Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
+ Self {
+ inner: Some(inner),
+ orphan_queue,
+ signal,
+ }
+ }
+
+ fn inner(&self) -> &W {
+ self.inner.as_ref().expect("inner has gone away")
+ }
+
+ pub(crate) fn inner_mut(&mut self) -> &mut W {
+ self.inner.as_mut().expect("inner has gone away")
+ }
+}
+
+impl<W, Q, S> Future for Reaper<W, Q, S>
+where
+ W: Wait + Unpin,
+ Q: OrphanQueue<W> + Unpin,
+ S: InternalStream + Unpin,
+{
+ type Output = io::Result<ExitStatus>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ loop {
+ // If the child hasn't exited yet, then it's our responsibility to
+ // ensure the current task gets notified when it might be able to
+ // make progress. We can use the delivery of a SIGCHLD signal as a
+ // sign that we can potentially make progress.
+ //
+ // However, we will register for a notification on the next signal
+ // BEFORE we poll the child. Otherwise it is possible that the child
+ // can exit and the signal can arrive after we last polled the child,
+ // but before we've registered for a notification on the next signal
+ // (this can cause a deadlock if there are no more spawned children
+ // which can generate a different signal for us). A side effect of
+ // pre-registering for signal notifications is that when the child
+ // exits, we will have already registered for an additional
+ // notification we don't need to consume. If another signal arrives,
+ // this future's task will be notified/woken up again. Since the
+ // futures model allows for spurious wake ups this extra wakeup
+ // should not cause significant issues with parent futures.
+ let registered_interest = self.signal.poll_recv(cx).is_pending();
+
+ if let Some(status) = self.inner_mut().try_wait()? {
+ return Poll::Ready(Ok(status));
+ }
+
+ // If our attempt to poll for the next signal was not ready, then
+ // we've arranged for our task to get notified and we can bail out.
+ if registered_interest {
+ return Poll::Pending;
+ } else {
+ // Otherwise, if the signal stream delivered a signal to us, we
+ // won't get notified at the next signal, so we'll loop and try
+ // again.
+ continue;
+ }
+ }
+ }
+}
+
+impl<W, Q, S> Kill for Reaper<W, Q, S>
+where
+ W: Kill + Wait,
+ Q: OrphanQueue<W>,
+{
+ fn kill(&mut self) -> io::Result<()> {
+ self.inner_mut().kill()
+ }
+}
+
+impl<W, Q, S> Drop for Reaper<W, Q, S>
+where
+ W: Wait,
+ Q: OrphanQueue<W>,
+{
+ fn drop(&mut self) {
+ if let Ok(Some(_)) = self.inner_mut().try_wait() {
+ return;
+ }
+
+ let orphan = self.inner.take().unwrap();
+ self.orphan_queue.push_orphan(orphan);
+ }
+}
+
+#[cfg(all(test, not(loom)))]
+mod test {
+ use super::*;
+
+ use crate::process::unix::orphan::test::MockQueue;
+ use futures::future::FutureExt;
+ use std::os::unix::process::ExitStatusExt;
+ use std::process::ExitStatus;
+ use std::task::Context;
+ use std::task::Poll;
+
+ #[derive(Debug)]
+ struct MockWait {
+ total_kills: usize,
+ total_waits: usize,
+ num_wait_until_status: usize,
+ status: ExitStatus,
+ }
+
+ impl MockWait {
+ fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
+ Self {
+ total_kills: 0,
+ total_waits: 0,
+ num_wait_until_status,
+ status,
+ }
+ }
+ }
+
+ impl Wait for MockWait {
+ fn id(&self) -> u32 {
+ 0
+ }
+
+ fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ let ret = if self.num_wait_until_status == self.total_waits {
+ Some(self.status)
+ } else {
+ None
+ };
+
+ self.total_waits += 1;
+ Ok(ret)
+ }
+ }
+
+ impl Kill for MockWait {
+ fn kill(&mut self) -> io::Result<()> {
+ self.total_kills += 1;
+ Ok(())
+ }
+ }
+
+ struct MockStream {
+ total_polls: usize,
+ values: Vec<Option<()>>,
+ }
+
+ impl MockStream {
+ fn new(values: Vec<Option<()>>) -> Self {
+ Self {
+ total_polls: 0,
+ values,
+ }
+ }
+ }
+
+ impl InternalStream for MockStream {
+ fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
+ self.total_polls += 1;
+ match self.values.remove(0) {
+ Some(()) => Poll::Ready(Some(())),
+ None => Poll::Pending,
+ }
+ }
+ }
+
+ #[test]
+ fn reaper() {
+ let exit = ExitStatus::from_raw(0);
+ let mock = MockWait::new(exit, 3);
+ let mut grim = Reaper::new(
+ mock,
+ MockQueue::new(),
+ MockStream::new(vec![None, Some(()), None, None, None]),
+ );
+
+ let waker = futures::task::noop_waker();
+ let mut context = Context::from_waker(&waker);
+
+ // Not yet exited, interest registered
+ assert!(grim.poll_unpin(&mut context).is_pending());
+ assert_eq!(1, grim.signal.total_polls);
+ assert_eq!(1, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+
+ // Not yet exited, couldn't register interest the first time
+ // but managed to register interest the second time around
+ assert!(grim.poll_unpin(&mut context).is_pending());
+ assert_eq!(3, grim.signal.total_polls);
+ assert_eq!(3, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+
+ // Exited
+ if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
+ assert!(r.is_ok());
+ let exit_code = r.unwrap();
+ assert_eq!(exit_code, exit);
+ } else {
+ unreachable!();
+ }
+ assert_eq!(4, grim.signal.total_polls);
+ assert_eq!(4, grim.total_waits);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+ }
+
+ #[test]
+ fn kill() {
+ let exit = ExitStatus::from_raw(0);
+ let mut grim = Reaper::new(
+ MockWait::new(exit, 0),
+ MockQueue::new(),
+ MockStream::new(vec![None]),
+ );
+
+ grim.kill().unwrap();
+ assert_eq!(1, grim.total_kills);
+ assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
+ }
+
+ #[test]
+ fn drop_reaps_if_possible() {
+ let exit = ExitStatus::from_raw(0);
+ let mut mock = MockWait::new(exit, 0);
+
+ {
+ let queue = MockQueue::new();
+
+ let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
+
+ drop(grim);
+
+ assert!(queue.all_enqueued.borrow().is_empty());
+ }
+
+ assert_eq!(1, mock.total_waits);
+ assert_eq!(0, mock.total_kills);
+ }
+
+ #[test]
+ fn drop_enqueues_orphan_if_wait_fails() {
+ let exit = ExitStatus::from_raw(0);
+ let mut mock = MockWait::new(exit, 2);
+
+ {
+ let queue = MockQueue::<&mut MockWait>::new();
+ let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
+ drop(grim);
+
+ assert_eq!(1, queue.all_enqueued.borrow().len());
+ }
+
+ assert_eq!(1, mock.total_waits);
+ assert_eq!(0, mock.total_kills);
+ }
+}
diff --git a/third_party/rust/tokio/src/process/windows.rs b/third_party/rust/tokio/src/process/windows.rs
new file mode 100644
index 0000000000..136d5b0cab
--- /dev/null
+++ b/third_party/rust/tokio/src/process/windows.rs
@@ -0,0 +1,205 @@
+//! Windows asynchronous process handling.
+//!
+//! Like with Unix we don't actually have a way of registering a process with an
+//! IOCP object. As a result we similarly need another mechanism for getting a
+//! signal when a process has exited. For now this is implemented with the
+//! `RegisterWaitForSingleObject` function in the kernel32.dll.
+//!
+//! This strategy is the same that libuv takes and essentially just queues up a
+//! wait for the process in a kernel32-specific thread pool. Once the object is
+//! notified (e.g. the process exits) then we have a callback that basically
+//! just completes a `Oneshot`.
+//!
+//! The `poll_exit` implementation will attempt to wait for the process in a
+//! nonblocking fashion, but failing that it'll fire off a
+//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
+//! from then on out.
+
+use crate::io::PollEvented;
+use crate::process::kill::Kill;
+use crate::process::SpawnedChild;
+use crate::sync::oneshot;
+
+use mio::windows::NamedPipe;
+use std::fmt;
+use std::future::Future;
+use std::io;
+use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
+use std::pin::Pin;
+use std::process::Stdio;
+use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
+use std::ptr;
+use std::task::Context;
+use std::task::Poll;
+use winapi::shared::minwindef::{DWORD, FALSE};
+use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE};
+use winapi::um::processthreadsapi::GetCurrentProcess;
+use winapi::um::threadpoollegacyapiset::UnregisterWaitEx;
+use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE};
+use winapi::um::winnt::{
+ BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE,
+};
+
+#[must_use = "futures do nothing unless polled"]
+pub(crate) struct Child {
+ child: StdChild,
+ waiting: Option<Waiting>,
+}
+
+impl fmt::Debug for Child {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Child")
+ .field("pid", &self.id())
+ .field("child", &self.child)
+ .field("waiting", &"..")
+ .finish()
+ }
+}
+
+struct Waiting {
+ rx: oneshot::Receiver<()>,
+ wait_object: HANDLE,
+ tx: *mut Option<oneshot::Sender<()>>,
+}
+
+unsafe impl Sync for Waiting {}
+unsafe impl Send for Waiting {}
+
+pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild> {
+ let mut child = cmd.spawn()?;
+ let stdin = child.stdin.take().map(stdio).transpose()?;
+ let stdout = child.stdout.take().map(stdio).transpose()?;
+ let stderr = child.stderr.take().map(stdio).transpose()?;
+
+ Ok(SpawnedChild {
+ child: Child {
+ child,
+ waiting: None,
+ },
+ stdin,
+ stdout,
+ stderr,
+ })
+}
+
+impl Child {
+ pub(crate) fn id(&self) -> u32 {
+ self.child.id()
+ }
+
+ pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
+ self.child.try_wait()
+ }
+}
+
+impl Kill for Child {
+ fn kill(&mut self) -> io::Result<()> {
+ self.child.kill()
+ }
+}
+
+impl Future for Child {
+ type Output = io::Result<ExitStatus>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let inner = Pin::get_mut(self);
+ loop {
+ if let Some(ref mut w) = inner.waiting {
+ match Pin::new(&mut w.rx).poll(cx) {
+ Poll::Ready(Ok(())) => {}
+ Poll::Ready(Err(_)) => panic!("should not be canceled"),
+ Poll::Pending => return Poll::Pending,
+ }
+ let status = inner.try_wait()?.expect("not ready yet");
+ return Poll::Ready(Ok(status));
+ }
+
+ if let Some(e) = inner.try_wait()? {
+ return Poll::Ready(Ok(e));
+ }
+ let (tx, rx) = oneshot::channel();
+ let ptr = Box::into_raw(Box::new(Some(tx)));
+ let mut wait_object = ptr::null_mut();
+ let rc = unsafe {
+ RegisterWaitForSingleObject(
+ &mut wait_object,
+ inner.child.as_raw_handle(),
+ Some(callback),
+ ptr as *mut _,
+ INFINITE,
+ WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
+ )
+ };
+ if rc == 0 {
+ let err = io::Error::last_os_error();
+ drop(unsafe { Box::from_raw(ptr) });
+ return Poll::Ready(Err(err));
+ }
+ inner.waiting = Some(Waiting {
+ rx,
+ wait_object,
+ tx: ptr,
+ });
+ }
+ }
+}
+
+impl AsRawHandle for Child {
+ fn as_raw_handle(&self) -> RawHandle {
+ self.child.as_raw_handle()
+ }
+}
+
+impl Drop for Waiting {
+ fn drop(&mut self) {
+ unsafe {
+ let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE);
+ if rc == 0 {
+ panic!("failed to unregister: {}", io::Error::last_os_error());
+ }
+ drop(Box::from_raw(self.tx));
+ }
+ }
+}
+
+unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) {
+ let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>);
+ let _ = complete.take().unwrap().send(());
+}
+
+pub(crate) type ChildStdio = PollEvented<NamedPipe>;
+
+pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>>
+where
+ T: IntoRawHandle,
+{
+ let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
+ PollEvented::new(pipe)
+}
+
+pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> {
+ let named_pipe = io.into_inner()?;
+
+ // Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually
+ // duplicate the handle here...
+ unsafe {
+ let mut dup_handle = INVALID_HANDLE_VALUE;
+ let cur_proc = GetCurrentProcess();
+
+ let status = DuplicateHandle(
+ cur_proc,
+ named_pipe.as_raw_handle(),
+ cur_proc,
+ &mut dup_handle,
+ 0 as DWORD,
+ FALSE,
+ DUPLICATE_SAME_ACCESS,
+ );
+
+ if status == 0 {
+ return Err(io::Error::last_os_error());
+ }
+
+ Ok(Stdio::from_raw_handle(dup_handle))
+ }
+}