diff options
Diffstat (limited to 'third_party/rust/tokio/src/process')
-rw-r--r-- | third_party/rust/tokio/src/process/kill.rs | 13 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/mod.rs | 1534 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/driver.rs | 58 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/mod.rs | 250 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/orphan.rs | 321 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/unix/reap.rs | 298 | ||||
-rw-r--r-- | third_party/rust/tokio/src/process/windows.rs | 205 |
7 files changed, 2679 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/process/kill.rs b/third_party/rust/tokio/src/process/kill.rs new file mode 100644 index 0000000000..a1f1652281 --- /dev/null +++ b/third_party/rust/tokio/src/process/kill.rs @@ -0,0 +1,13 @@ +use std::io; + +/// An interface for killing a running process. +pub(crate) trait Kill { + /// Forcefully kills the process. + fn kill(&mut self) -> io::Result<()>; +} + +impl<T: Kill> Kill for &mut T { + fn kill(&mut self) -> io::Result<()> { + (**self).kill() + } +} diff --git a/third_party/rust/tokio/src/process/mod.rs b/third_party/rust/tokio/src/process/mod.rs new file mode 100644 index 0000000000..4e1a21dd44 --- /dev/null +++ b/third_party/rust/tokio/src/process/mod.rs @@ -0,0 +1,1534 @@ +//! An implementation of asynchronous process management for Tokio. +//! +//! This module provides a [`Command`] struct that imitates the interface of the +//! [`std::process::Command`] type in the standard library, but provides asynchronous versions of +//! functions that create processes. These functions (`spawn`, `status`, `output` and their +//! variants) return "future aware" types that interoperate with Tokio. The asynchronous process +//! support is provided through signal handling on Unix and system APIs on Windows. +//! +//! [`std::process::Command`]: std::process::Command +//! +//! # Examples +//! +//! Here's an example program which will spawn `echo hello world` and then wait +//! for it complete. +//! +//! ```no_run +//! use tokio::process::Command; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // The usage is similar as with the standard library's `Command` type +//! let mut child = Command::new("echo") +//! .arg("hello") +//! .arg("world") +//! .spawn() +//! .expect("failed to spawn"); +//! +//! // Await until the command completes +//! let status = child.wait().await?; +//! println!("the command exited with: {}", status); +//! Ok(()) +//! } +//! ``` +//! +//! Next, let's take a look at an example where we not only spawn `echo hello +//! world` but we also capture its output. +//! +//! ```no_run +//! use tokio::process::Command; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Like above, but use `output` which returns a future instead of +//! // immediately returning the `Child`. +//! let output = Command::new("echo").arg("hello").arg("world") +//! .output(); +//! +//! let output = output.await?; +//! +//! assert!(output.status.success()); +//! assert_eq!(output.stdout, b"hello world\n"); +//! Ok(()) +//! } +//! ``` +//! +//! We can also read input line by line. +//! +//! ```no_run +//! use tokio::io::{BufReader, AsyncBufReadExt}; +//! use tokio::process::Command; +//! +//! use std::process::Stdio; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut cmd = Command::new("cat"); +//! +//! // Specify that we want the command's standard output piped back to us. +//! // By default, standard input/output/error will be inherited from the +//! // current process (for example, this means that standard input will +//! // come from the keyboard and standard output/error will go directly to +//! // the terminal if this process is invoked from the command line). +//! cmd.stdout(Stdio::piped()); +//! +//! let mut child = cmd.spawn() +//! .expect("failed to spawn command"); +//! +//! let stdout = child.stdout.take() +//! .expect("child did not have a handle to stdout"); +//! +//! let mut reader = BufReader::new(stdout).lines(); +//! +//! // Ensure the child process is spawned in the runtime so it can +//! // make progress on its own while we await for any output. +//! tokio::spawn(async move { +//! let status = child.wait().await +//! .expect("child process encountered an error"); +//! +//! println!("child status was: {}", status); +//! }); +//! +//! while let Some(line) = reader.next_line().await? { +//! println!("Line: {}", line); +//! } +//! +//! Ok(()) +//! } +//! ``` +//! +//! Here is another example using `sort` writing into the child process +//! standard input, capturing the output of the sorted text. +//! +//! ```no_run +//! use tokio::io::AsyncWriteExt; +//! use tokio::process::Command; +//! +//! use std::process::Stdio; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut cmd = Command::new("sort"); +//! +//! // Specifying that we want pipe both the output and the input. +//! // Similarily to capturing the output, by configuring the pipe +//! // to stdin it can now be used as an asynchronous writer. +//! cmd.stdout(Stdio::piped()); +//! cmd.stdin(Stdio::piped()); +//! +//! let mut child = cmd.spawn().expect("failed to spawn command"); +//! +//! // These are the animals we want to sort +//! let animals: &[&str] = &["dog", "bird", "frog", "cat", "fish"]; +//! +//! let mut stdin = child +//! .stdin +//! .take() +//! .expect("child did not have a handle to stdin"); +//! +//! // Write our animals to the child process +//! // Note that the behavior of `sort` is to buffer _all input_ before writing any output. +//! // In the general sense, it is recommended to write to the child in a separate task as +//! // awaiting its exit (or output) to avoid deadlocks (for example, the child tries to write +//! // some output but gets stuck waiting on the parent to read from it, meanwhile the parent +//! // is stuck waiting to write its input completely before reading the output). +//! stdin +//! .write(animals.join("\n").as_bytes()) +//! .await +//! .expect("could not write to stdin"); +//! +//! // We drop the handle here which signals EOF to the child process. +//! // This tells the child process that it there is no more data on the pipe. +//! drop(stdin); +//! +//! let op = child.wait_with_output().await?; +//! +//! // Results should come back in sorted order +//! assert_eq!(op.stdout, "bird\ncat\ndog\nfish\nfrog\n".as_bytes()); +//! +//! Ok(()) +//! } +//! ``` +//! +//! With some coordination, we can also pipe the output of one command into +//! another. +//! +//! ```no_run +//! use tokio::join; +//! use tokio::process::Command; +//! use std::convert::TryInto; +//! use std::process::Stdio; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut echo = Command::new("echo") +//! .arg("hello world!") +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn echo"); +//! +//! let tr_stdin: Stdio = echo +//! .stdout +//! .take() +//! .unwrap() +//! .try_into() +//! .expect("failed to convert to Stdio"); +//! +//! let tr = Command::new("tr") +//! .arg("a-z") +//! .arg("A-Z") +//! .stdin(tr_stdin) +//! .stdout(Stdio::piped()) +//! .spawn() +//! .expect("failed to spawn tr"); +//! +//! let (echo_result, tr_output) = join!(echo.wait(), tr.wait_with_output()); +//! +//! assert!(echo_result.unwrap().success()); +//! +//! let tr_output = tr_output.expect("failed to await tr"); +//! assert!(tr_output.status.success()); +//! +//! assert_eq!(tr_output.stdout, b"HELLO WORLD!\n"); +//! +//! Ok(()) +//! } +//! ``` +//! +//! # Caveats +//! +//! ## Dropping/Cancellation +//! +//! Similar to the behavior to the standard library, and unlike the futures +//! paradigm of dropping-implies-cancellation, a spawned process will, by +//! default, continue to execute even after the `Child` handle has been dropped. +//! +//! The [`Command::kill_on_drop`] method can be used to modify this behavior +//! and kill the child process if the `Child` wrapper is dropped before it +//! has exited. +//! +//! ## Unix Processes +//! +//! On Unix platforms processes must be "reaped" by their parent process after +//! they have exited in order to release all OS resources. A child process which +//! has exited, but has not yet been reaped by its parent is considered a "zombie" +//! process. Such processes continue to count against limits imposed by the system, +//! and having too many zombie processes present can prevent additional processes +//! from being spawned. +//! +//! The tokio runtime will, on a best-effort basis, attempt to reap and clean up +//! any process which it has spawned. No additional guarantees are made with regards +//! how quickly or how often this procedure will take place. +//! +//! It is recommended to avoid dropping a [`Child`] process handle before it has been +//! fully `await`ed if stricter cleanup guarantees are required. +//! +//! [`Command`]: crate::process::Command +//! [`Command::kill_on_drop`]: crate::process::Command::kill_on_drop +//! [`Child`]: crate::process::Child + +#[path = "unix/mod.rs"] +#[cfg(unix)] +mod imp; + +#[cfg(unix)] +pub(crate) mod unix { + pub(crate) use super::imp::*; +} + +#[path = "windows.rs"] +#[cfg(windows)] +mod imp; + +mod kill; + +use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::process::kill::Kill; + +use std::convert::TryInto; +use std::ffi::OsStr; +use std::future::Future; +use std::io; +#[cfg(unix)] +use std::os::unix::process::CommandExt; +#[cfg(windows)] +use std::os::windows::io::{AsRawHandle, RawHandle}; +#[cfg(windows)] +use std::os::windows::process::CommandExt; +use std::path::Path; +use std::pin::Pin; +use std::process::{Command as StdCommand, ExitStatus, Output, Stdio}; +use std::task::Context; +use std::task::Poll; + +/// This structure mimics the API of [`std::process::Command`] found in the standard library, but +/// replaces functions that create a process with an asynchronous variant. The main provided +/// asynchronous functions are [spawn](Command::spawn), [status](Command::status), and +/// [output](Command::output). +/// +/// `Command` uses asynchronous versions of some `std` types (for example [`Child`]). +/// +/// [`std::process::Command`]: std::process::Command +/// [`Child`]: struct@Child +#[derive(Debug)] +pub struct Command { + std: StdCommand, + kill_on_drop: bool, +} + +pub(crate) struct SpawnedChild { + child: imp::Child, + stdin: Option<imp::ChildStdio>, + stdout: Option<imp::ChildStdio>, + stderr: Option<imp::ChildStdio>, +} + +impl Command { + /// Constructs a new `Command` for launching the program at + /// path `program`, with the following default configuration: + /// + /// * No arguments to the program + /// * Inherit the current process's environment + /// * Inherit the current process's working directory + /// * Inherit stdin/stdout/stderr for `spawn` or `status`, but create pipes for `output` + /// + /// Builder methods are provided to change these defaults and + /// otherwise configure the process. + /// + /// If `program` is not an absolute path, the `PATH` will be searched in + /// an OS-defined way. + /// + /// The search path to be used may be controlled by setting the + /// `PATH` environment variable on the Command, + /// but this has some implementation limitations on Windows + /// (see issue [rust-lang/rust#37519]). + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// let command = Command::new("sh"); + /// ``` + /// + /// [rust-lang/rust#37519]: https://github.com/rust-lang/rust/issues/37519 + pub fn new<S: AsRef<OsStr>>(program: S) -> Command { + Self::from(StdCommand::new(program)) + } + + /// Cheaply convert to a `&std::process::Command` for places where the type from the standard + /// library is expected. + pub fn as_std(&self) -> &StdCommand { + &self.std + } + + /// Adds an argument to pass to the program. + /// + /// Only one argument can be passed per use. So instead of: + /// + /// ```no_run + /// tokio::process::Command::new("sh") + /// .arg("-C /path/to/repo"); + /// ``` + /// + /// usage would be: + /// + /// ```no_run + /// tokio::process::Command::new("sh") + /// .arg("-C") + /// .arg("/path/to/repo"); + /// ``` + /// + /// To pass multiple arguments see [`args`]. + /// + /// [`args`]: method@Self::args + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .arg("-l") + /// .arg("-a"); + /// ``` + pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Command { + self.std.arg(arg); + self + } + + /// Adds multiple arguments to pass to the program. + /// + /// To pass a single argument see [`arg`]. + /// + /// [`arg`]: method@Self::arg + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .args(&["-l", "-a"]); + /// ``` + pub fn args<I, S>(&mut self, args: I) -> &mut Command + where + I: IntoIterator<Item = S>, + S: AsRef<OsStr>, + { + self.std.args(args); + self + } + + /// Inserts or updates an environment variable mapping. + /// + /// Note that environment variable names are case-insensitive (but case-preserving) on Windows, + /// and case-sensitive on all other platforms. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .env("PATH", "/bin"); + /// ``` + pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Command + where + K: AsRef<OsStr>, + V: AsRef<OsStr>, + { + self.std.env(key, val); + self + } + + /// Adds or updates multiple environment variable mappings. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// use std::process::{Stdio}; + /// use std::env; + /// use std::collections::HashMap; + /// + /// let filtered_env : HashMap<String, String> = + /// env::vars().filter(|&(ref k, _)| + /// k == "TERM" || k == "TZ" || k == "LANG" || k == "PATH" + /// ).collect(); + /// + /// let command = Command::new("printenv") + /// .stdin(Stdio::null()) + /// .stdout(Stdio::inherit()) + /// .env_clear() + /// .envs(&filtered_env); + /// ``` + pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Command + where + I: IntoIterator<Item = (K, V)>, + K: AsRef<OsStr>, + V: AsRef<OsStr>, + { + self.std.envs(vars); + self + } + + /// Removes an environment variable mapping. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .env_remove("PATH"); + /// ``` + pub fn env_remove<K: AsRef<OsStr>>(&mut self, key: K) -> &mut Command { + self.std.env_remove(key); + self + } + + /// Clears the entire environment map for the child process. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .env_clear(); + /// ``` + pub fn env_clear(&mut self) -> &mut Command { + self.std.env_clear(); + self + } + + /// Sets the working directory for the child process. + /// + /// # Platform-specific behavior + /// + /// If the program path is relative (e.g., `"./script.sh"`), it's ambiguous + /// whether it should be interpreted relative to the parent's working + /// directory or relative to `current_dir`. The behavior in this case is + /// platform specific and unstable, and it's recommended to use + /// [`canonicalize`] to get an absolute program path instead. + /// + /// [`canonicalize`]: crate::fs::canonicalize() + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .current_dir("/bin"); + /// ``` + pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Command { + self.std.current_dir(dir); + self + } + + /// Sets configuration for the child process's standard input (stdin) handle. + /// + /// Defaults to [`inherit`] when used with `spawn` or `status`, and + /// defaults to [`piped`] when used with `output`. + /// + /// [`inherit`]: std::process::Stdio::inherit + /// [`piped`]: std::process::Stdio::piped + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use std::process::{Stdio}; + /// use tokio::process::Command; + /// + /// let command = Command::new("ls") + /// .stdin(Stdio::null()); + /// ``` + pub fn stdin<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { + self.std.stdin(cfg); + self + } + + /// Sets configuration for the child process's standard output (stdout) handle. + /// + /// Defaults to [`inherit`] when used with `spawn` or `status`, and + /// defaults to [`piped`] when used with `output`. + /// + /// [`inherit`]: std::process::Stdio::inherit + /// [`piped`]: std::process::Stdio::piped + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// use std::process::Stdio; + /// + /// let command = Command::new("ls") + /// .stdout(Stdio::null()); + /// ``` + pub fn stdout<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { + self.std.stdout(cfg); + self + } + + /// Sets configuration for the child process's standard error (stderr) handle. + /// + /// Defaults to [`inherit`] when used with `spawn` or `status`, and + /// defaults to [`piped`] when used with `output`. + /// + /// [`inherit`]: std::process::Stdio::inherit + /// [`piped`]: std::process::Stdio::piped + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// use std::process::{Stdio}; + /// + /// let command = Command::new("ls") + /// .stderr(Stdio::null()); + /// ``` + pub fn stderr<T: Into<Stdio>>(&mut self, cfg: T) -> &mut Command { + self.std.stderr(cfg); + self + } + + /// Controls whether a `kill` operation should be invoked on a spawned child + /// process when its corresponding `Child` handle is dropped. + /// + /// By default, this value is assumed to be `false`, meaning the next spawned + /// process will not be killed on drop, similar to the behavior of the standard + /// library. + /// + /// # Caveats + /// + /// On Unix platforms processes must be "reaped" by their parent process after + /// they have exited in order to release all OS resources. A child process which + /// has exited, but has not yet been reaped by its parent is considered a "zombie" + /// process. Such processes continue to count against limits imposed by the system, + /// and having too many zombie processes present can prevent additional processes + /// from being spawned. + /// + /// Although issuing a `kill` signal to the child process is a synchronous + /// operation, the resulting zombie process cannot be `.await`ed inside of the + /// destructor to avoid blocking other tasks. The tokio runtime will, on a + /// best-effort basis, attempt to reap and clean up such processes in the + /// background, but makes no additional guarantees are made with regards + /// how quickly or how often this procedure will take place. + /// + /// If stronger guarantees are required, it is recommended to avoid dropping + /// a [`Child`] handle where possible, and instead utilize `child.wait().await` + /// or `child.kill().await` where possible. + pub fn kill_on_drop(&mut self, kill_on_drop: bool) -> &mut Command { + self.kill_on_drop = kill_on_drop; + self + } + + /// Sets the [process creation flags][1] to be passed to `CreateProcess`. + /// + /// These will always be ORed with `CREATE_UNICODE_ENVIRONMENT`. + /// + /// [1]: https://msdn.microsoft.com/en-us/library/windows/desktop/ms684863(v=vs.85).aspx + #[cfg(windows)] + #[cfg_attr(docsrs, doc(cfg(windows)))] + pub fn creation_flags(&mut self, flags: u32) -> &mut Command { + self.std.creation_flags(flags); + self + } + + /// Sets the child process's user ID. This translates to a + /// `setuid` call in the child process. Failure in the `setuid` + /// call will cause the spawn to fail. + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub fn uid(&mut self, id: u32) -> &mut Command { + self.std.uid(id); + self + } + + /// Similar to `uid` but sets the group ID of the child process. This has + /// the same semantics as the `uid` field. + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub fn gid(&mut self, id: u32) -> &mut Command { + self.std.gid(id); + self + } + + /// Sets executable argument. + /// + /// Set the first process argument, `argv[0]`, to something other than the + /// default executable path. + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub fn arg0<S>(&mut self, arg: S) -> &mut Command + where + S: AsRef<OsStr>, + { + self.std.arg0(arg); + self + } + + /// Schedules a closure to be run just before the `exec` function is + /// invoked. + /// + /// The closure is allowed to return an I/O error whose OS error code will + /// be communicated back to the parent and returned as an error from when + /// the spawn was requested. + /// + /// Multiple closures can be registered and they will be called in order of + /// their registration. If a closure returns `Err` then no further closures + /// will be called and the spawn operation will immediately return with a + /// failure. + /// + /// # Safety + /// + /// This closure will be run in the context of the child process after a + /// `fork`. This primarily means that any modifications made to memory on + /// behalf of this closure will **not** be visible to the parent process. + /// This is often a very constrained environment where normal operations + /// like `malloc` or acquiring a mutex are not guaranteed to work (due to + /// other threads perhaps still running when the `fork` was run). + /// + /// This also means that all resources such as file descriptors and + /// memory-mapped regions got duplicated. It is your responsibility to make + /// sure that the closure does not violate library invariants by making + /// invalid use of these duplicates. + /// + /// When this closure is run, aspects such as the stdio file descriptors and + /// working directory have successfully been changed, so output to these + /// locations may not appear where intended. + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub unsafe fn pre_exec<F>(&mut self, f: F) -> &mut Command + where + F: FnMut() -> io::Result<()> + Send + Sync + 'static, + { + self.std.pre_exec(f); + self + } + + /// Executes the command as a child process, returning a handle to it. + /// + /// By default, stdin, stdout and stderr are inherited from the parent. + /// + /// This method will spawn the child process synchronously and return a + /// handle to a future-aware child process. The `Child` returned implements + /// `Future` itself to acquire the `ExitStatus` of the child, and otherwise + /// the `Child` has methods to acquire handles to the stdin, stdout, and + /// stderr streams. + /// + /// All I/O this child does will be associated with the current default + /// event loop. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// async fn run_ls() -> std::process::ExitStatus { + /// Command::new("ls") + /// .spawn() + /// .expect("ls command failed to start") + /// .wait() + /// .await + /// .expect("ls command failed to run") + /// } + /// ``` + /// + /// # Caveats + /// + /// ## Dropping/Cancellation + /// + /// Similar to the behavior to the standard library, and unlike the futures + /// paradigm of dropping-implies-cancellation, a spawned process will, by + /// default, continue to execute even after the `Child` handle has been dropped. + /// + /// The [`Command::kill_on_drop`] method can be used to modify this behavior + /// and kill the child process if the `Child` wrapper is dropped before it + /// has exited. + /// + /// ## Unix Processes + /// + /// On Unix platforms processes must be "reaped" by their parent process after + /// they have exited in order to release all OS resources. A child process which + /// has exited, but has not yet been reaped by its parent is considered a "zombie" + /// process. Such processes continue to count against limits imposed by the system, + /// and having too many zombie processes present can prevent additional processes + /// from being spawned. + /// + /// The tokio runtime will, on a best-effort basis, attempt to reap and clean up + /// any process which it has spawned. No additional guarantees are made with regards + /// how quickly or how often this procedure will take place. + /// + /// It is recommended to avoid dropping a [`Child`] process handle before it has been + /// fully `await`ed if stricter cleanup guarantees are required. + /// + /// [`Command`]: crate::process::Command + /// [`Command::kill_on_drop`]: crate::process::Command::kill_on_drop + /// [`Child`]: crate::process::Child + /// + /// # Errors + /// + /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock` + /// if the system process limit is reached (which includes other applications + /// running on the system). + pub fn spawn(&mut self) -> io::Result<Child> { + imp::spawn_child(&mut self.std).map(|spawned_child| Child { + child: FusedChild::Child(ChildDropGuard { + inner: spawned_child.child, + kill_on_drop: self.kill_on_drop, + }), + stdin: spawned_child.stdin.map(|inner| ChildStdin { inner }), + stdout: spawned_child.stdout.map(|inner| ChildStdout { inner }), + stderr: spawned_child.stderr.map(|inner| ChildStderr { inner }), + }) + } + + /// Executes the command as a child process, waiting for it to finish and + /// collecting its exit status. + /// + /// By default, stdin, stdout and stderr are inherited from the parent. + /// If any input/output handles are set to a pipe then they will be immediately + /// closed after the child is spawned. + /// + /// All I/O this child does will be associated with the current default + /// event loop. + /// + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop + /// + /// # Errors + /// + /// This future will return an error if the child process cannot be spawned + /// or if there is an error while awaiting its status. + /// + /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock` + /// if the system process limit is reached (which includes other applications + /// running on the system). + /// + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// async fn run_ls() -> std::process::ExitStatus { + /// Command::new("ls") + /// .status() + /// .await + /// .expect("ls command failed to run") + /// } + /// ``` + pub fn status(&mut self) -> impl Future<Output = io::Result<ExitStatus>> { + let child = self.spawn(); + + async { + let mut child = child?; + + // Ensure we close any stdio handles so we can't deadlock + // waiting on the child which may be waiting to read/write + // to a pipe we're holding. + child.stdin.take(); + child.stdout.take(); + child.stderr.take(); + + child.wait().await + } + } + + /// Executes the command as a child process, waiting for it to finish and + /// collecting all of its output. + /// + /// > **Note**: this method, unlike the standard library, will + /// > unconditionally configure the stdout/stderr handles to be pipes, even + /// > if they have been previously configured. If this is not desired then + /// > the `spawn` method should be used in combination with the + /// > `wait_with_output` method on child. + /// + /// This method will return a future representing the collection of the + /// child process's stdout/stderr. It will resolve to + /// the `Output` type in the standard library, containing `stdout` and + /// `stderr` as `Vec<u8>` along with an `ExitStatus` representing how the + /// process exited. + /// + /// All I/O this child does will be associated with the current default + /// event loop. + /// + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop + /// + /// # Errors + /// + /// This future will return an error if the child process cannot be spawned + /// or if there is an error while awaiting its status. + /// + /// On Unix platforms this method will fail with `std::io::ErrorKind::WouldBlock` + /// if the system process limit is reached (which includes other applications + /// running on the system). + /// # Examples + /// + /// Basic usage: + /// + /// ```no_run + /// use tokio::process::Command; + /// + /// async fn run_ls() { + /// let output: std::process::Output = Command::new("ls") + /// .output() + /// .await + /// .expect("ls command failed to run"); + /// println!("stderr of ls: {:?}", output.stderr); + /// } + /// ``` + pub fn output(&mut self) -> impl Future<Output = io::Result<Output>> { + self.std.stdout(Stdio::piped()); + self.std.stderr(Stdio::piped()); + + let child = self.spawn(); + + async { child?.wait_with_output().await } + } +} + +impl From<StdCommand> for Command { + fn from(std: StdCommand) -> Command { + Command { + std, + kill_on_drop: false, + } + } +} + +/// A drop guard which can ensure the child process is killed on drop if specified. +#[derive(Debug)] +struct ChildDropGuard<T: Kill> { + inner: T, + kill_on_drop: bool, +} + +impl<T: Kill> Kill for ChildDropGuard<T> { + fn kill(&mut self) -> io::Result<()> { + let ret = self.inner.kill(); + + if ret.is_ok() { + self.kill_on_drop = false; + } + + ret + } +} + +impl<T: Kill> Drop for ChildDropGuard<T> { + fn drop(&mut self) { + if self.kill_on_drop { + drop(self.kill()); + } + } +} + +impl<T, E, F> Future for ChildDropGuard<F> +where + F: Future<Output = Result<T, E>> + Kill + Unpin, +{ + type Output = Result<T, E>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + let ret = Pin::new(&mut self.inner).poll(cx); + + if let Poll::Ready(Ok(_)) = ret { + // Avoid the overhead of trying to kill a reaped process + self.kill_on_drop = false; + } + + if ret.is_ready() { + coop.made_progress(); + } + + ret + } +} + +/// Keeps track of the exit status of a child process without worrying about +/// polling the underlying futures even after they have completed. +#[derive(Debug)] +enum FusedChild { + Child(ChildDropGuard<imp::Child>), + Done(ExitStatus), +} + +/// Representation of a child process spawned onto an event loop. +/// +/// # Caveats +/// Similar to the behavior to the standard library, and unlike the futures +/// paradigm of dropping-implies-cancellation, a spawned process will, by +/// default, continue to execute even after the `Child` handle has been dropped. +/// +/// The `Command::kill_on_drop` method can be used to modify this behavior +/// and kill the child process if the `Child` wrapper is dropped before it +/// has exited. +#[derive(Debug)] +pub struct Child { + child: FusedChild, + + /// The handle for writing to the child's standard input (stdin), if it has + /// been captured. To avoid partially moving the `child` and thus blocking + /// yourself from calling functions on `child` while using `stdin`, you might + /// find it helpful to do: + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stdin = child.stdin.take().unwrap(); + /// ``` + pub stdin: Option<ChildStdin>, + + /// The handle for reading from the child's standard output (stdout), if it + /// has been captured. You might find it helpful to do + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stdout = child.stdout.take().unwrap(); + /// ``` + /// + /// to avoid partially moving the `child` and thus blocking yourself from calling + /// functions on `child` while using `stdout`. + pub stdout: Option<ChildStdout>, + + /// The handle for reading from the child's standard error (stderr), if it + /// has been captured. You might find it helpful to do + /// + /// ```no_run + /// # let mut child = tokio::process::Command::new("echo").spawn().unwrap(); + /// let stderr = child.stderr.take().unwrap(); + /// ``` + /// + /// to avoid partially moving the `child` and thus blocking yourself from calling + /// functions on `child` while using `stderr`. + pub stderr: Option<ChildStderr>, +} + +impl Child { + /// Returns the OS-assigned process identifier associated with this child + /// while it is still running. + /// + /// Once the child has been polled to completion this will return `None`. + /// This is done to avoid confusion on platforms like Unix where the OS + /// identifier could be reused once the process has completed. + pub fn id(&self) -> Option<u32> { + match &self.child { + FusedChild::Child(child) => Some(child.inner.id()), + FusedChild::Done(_) => None, + } + } + + /// Extracts the raw handle of the process associated with this child while + /// it is still running. Returns `None` if the child has exited. + #[cfg(windows)] + pub fn raw_handle(&self) -> Option<RawHandle> { + match &self.child { + FusedChild::Child(c) => Some(c.inner.as_raw_handle()), + FusedChild::Done(_) => None, + } + } + + /// Attempts to force the child to exit, but does not wait for the request + /// to take effect. + /// + /// On Unix platforms, this is the equivalent to sending a SIGKILL. Note + /// that on Unix platforms it is possible for a zombie process to remain + /// after a kill is sent; to avoid this, the caller should ensure that either + /// `child.wait().await` or `child.try_wait()` is invoked successfully. + pub fn start_kill(&mut self) -> io::Result<()> { + match &mut self.child { + FusedChild::Child(child) => child.kill(), + FusedChild::Done(_) => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid argument: can't kill an exited process", + )), + } + } + + /// Forces the child to exit. + /// + /// This is equivalent to sending a SIGKILL on unix platforms. + /// + /// If the child has to be killed remotely, it is possible to do it using + /// a combination of the select! macro and a oneshot channel. In the following + /// example, the child will run until completion unless a message is sent on + /// the oneshot channel. If that happens, the child is killed immediately + /// using the `.kill()` method. + /// + /// ```no_run + /// use tokio::process::Command; + /// use tokio::sync::oneshot::channel; + /// + /// #[tokio::main] + /// async fn main() { + /// let (send, recv) = channel::<()>(); + /// let mut child = Command::new("sleep").arg("1").spawn().unwrap(); + /// tokio::spawn(async move { send.send(()) }); + /// tokio::select! { + /// _ = child.wait() => {} + /// _ = recv => child.kill().await.expect("kill failed"), + /// } + /// } + /// ``` + pub async fn kill(&mut self) -> io::Result<()> { + self.start_kill()?; + self.wait().await?; + Ok(()) + } + + /// Waits for the child to exit completely, returning the status that it + /// exited with. This function will continue to have the same return value + /// after it has been called at least once. + /// + /// The stdin handle to the child process, if any, will be closed + /// before waiting. This helps avoid deadlock: it ensures that the + /// child does not block waiting for input from the parent, while + /// the parent waits for the child to exit. + /// + /// If the caller wishes to explicitly control when the child's stdin + /// handle is closed, they may `.take()` it before calling `.wait()`: + /// + /// ``` + /// # #[cfg(not(unix))]fn main(){} + /// # #[cfg(unix)] + /// use tokio::io::AsyncWriteExt; + /// # #[cfg(unix)] + /// use tokio::process::Command; + /// # #[cfg(unix)] + /// use std::process::Stdio; + /// + /// # #[cfg(unix)] + /// #[tokio::main] + /// async fn main() { + /// let mut child = Command::new("cat") + /// .stdin(Stdio::piped()) + /// .spawn() + /// .unwrap(); + /// + /// let mut stdin = child.stdin.take().unwrap(); + /// tokio::spawn(async move { + /// // do something with stdin here... + /// stdin.write_all(b"hello world\n").await.unwrap(); + /// + /// // then drop when finished + /// drop(stdin); + /// }); + /// + /// // wait for the process to complete + /// let _ = child.wait().await; + /// } + /// ``` + pub async fn wait(&mut self) -> io::Result<ExitStatus> { + // Ensure stdin is closed so the child isn't stuck waiting on + // input while the parent is waiting for it to exit. + drop(self.stdin.take()); + + match &mut self.child { + FusedChild::Done(exit) => Ok(*exit), + FusedChild::Child(child) => { + let ret = child.await; + + if let Ok(exit) = ret { + self.child = FusedChild::Done(exit); + } + + ret + } + } + } + + /// Attempts to collect the exit status of the child if it has already + /// exited. + /// + /// This function will not block the calling thread and will only + /// check to see if the child process has exited or not. If the child has + /// exited then on Unix the process ID is reaped. This function is + /// guaranteed to repeatedly return a successful exit status so long as the + /// child has already exited. + /// + /// If the child has exited, then `Ok(Some(status))` is returned. If the + /// exit status is not available at this time then `Ok(None)` is returned. + /// If an error occurs, then that error is returned. + /// + /// Note that unlike `wait`, this function will not attempt to drop stdin, + /// nor will it wake the current task if the child exits. + pub fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + match &mut self.child { + FusedChild::Done(exit) => Ok(Some(*exit)), + FusedChild::Child(guard) => { + let ret = guard.inner.try_wait(); + + if let Ok(Some(exit)) = ret { + // Avoid the overhead of trying to kill a reaped process + guard.kill_on_drop = false; + self.child = FusedChild::Done(exit); + } + + ret + } + } + } + + /// Returns a future that will resolve to an `Output`, containing the exit + /// status, stdout, and stderr of the child process. + /// + /// The returned future will simultaneously waits for the child to exit and + /// collect all remaining output on the stdout/stderr handles, returning an + /// `Output` instance. + /// + /// The stdin handle to the child process, if any, will be closed before + /// waiting. This helps avoid deadlock: it ensures that the child does not + /// block waiting for input from the parent, while the parent waits for the + /// child to exit. + /// + /// By default, stdin, stdout and stderr are inherited from the parent. In + /// order to capture the output into this `Output` it is necessary to create + /// new pipes between parent and child. Use `stdout(Stdio::piped())` or + /// `stderr(Stdio::piped())`, respectively, when creating a `Command`. + pub async fn wait_with_output(mut self) -> io::Result<Output> { + use crate::future::try_join3; + + async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> io::Result<Vec<u8>> { + let mut vec = Vec::new(); + if let Some(io) = io.as_mut() { + crate::io::util::read_to_end(io, &mut vec).await?; + } + Ok(vec) + } + + let mut stdout_pipe = self.stdout.take(); + let mut stderr_pipe = self.stderr.take(); + + let stdout_fut = read_to_end(&mut stdout_pipe); + let stderr_fut = read_to_end(&mut stderr_pipe); + + let (status, stdout, stderr) = try_join3(self.wait(), stdout_fut, stderr_fut).await?; + + // Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309> + drop(stdout_pipe); + drop(stderr_pipe); + + Ok(Output { + status, + stdout, + stderr, + }) + } +} + +/// The standard input stream for spawned children. +/// +/// This type implements the `AsyncWrite` trait to pass data to the stdin handle of +/// handle of a child process asynchronously. +#[derive(Debug)] +pub struct ChildStdin { + inner: imp::ChildStdio, +} + +/// The standard output stream for spawned children. +/// +/// This type implements the `AsyncRead` trait to read data from the stdout +/// handle of a child process asynchronously. +#[derive(Debug)] +pub struct ChildStdout { + inner: imp::ChildStdio, +} + +/// The standard error stream for spawned children. +/// +/// This type implements the `AsyncRead` trait to read data from the stderr +/// handle of a child process asynchronously. +#[derive(Debug)] +pub struct ChildStderr { + inner: imp::ChildStdio, +} + +impl ChildStdin { + /// Creates an asynchronous `ChildStdin` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStdin) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } +} + +impl ChildStdout { + /// Creates an asynchronous `ChildStderr` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStdout) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } +} + +impl ChildStderr { + /// Creates an asynchronous `ChildStderr` from a synchronous one. + /// + /// # Errors + /// + /// This method may fail if an error is encountered when setting the pipe to + /// non-blocking mode, or when registering the pipe with the runtime's IO + /// driver. + pub fn from_std(inner: std::process::ChildStderr) -> io::Result<Self> { + Ok(Self { + inner: imp::stdio(inner)?, + }) + } +} + +impl AsyncWrite for ChildStdin { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(Ok(())) + } +} + +impl AsyncRead for ChildStdout { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + // Safety: pipes support reading into uninitialized memory + unsafe { self.inner.poll_read(cx, buf) } + } +} + +impl AsyncRead for ChildStderr { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<io::Result<()>> { + // Safety: pipes support reading into uninitialized memory + unsafe { self.inner.poll_read(cx, buf) } + } +} + +impl TryInto<Stdio> for ChildStdin { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto<Stdio> for ChildStdout { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + +impl TryInto<Stdio> for ChildStderr { + type Error = io::Error; + + fn try_into(self) -> Result<Stdio, Self::Error> { + imp::convert_to_stdio(self.inner) + } +} + +#[cfg(unix)] +mod sys { + use std::os::unix::io::{AsRawFd, RawFd}; + + use super::{ChildStderr, ChildStdin, ChildStdout}; + + impl AsRawFd for ChildStdin { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } + } + + impl AsRawFd for ChildStdout { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } + } + + impl AsRawFd for ChildStderr { + fn as_raw_fd(&self) -> RawFd { + self.inner.as_raw_fd() + } + } +} + +#[cfg(windows)] +mod sys { + use std::os::windows::io::{AsRawHandle, RawHandle}; + + use super::{ChildStderr, ChildStdin, ChildStdout}; + + impl AsRawHandle for ChildStdin { + fn as_raw_handle(&self) -> RawHandle { + self.inner.as_raw_handle() + } + } + + impl AsRawHandle for ChildStdout { + fn as_raw_handle(&self) -> RawHandle { + self.inner.as_raw_handle() + } + } + + impl AsRawHandle for ChildStderr { + fn as_raw_handle(&self) -> RawHandle { + self.inner.as_raw_handle() + } + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::kill::Kill; + use super::ChildDropGuard; + + use futures::future::FutureExt; + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + struct Mock { + num_kills: usize, + num_polls: usize, + poll_result: Poll<Result<(), ()>>, + } + + impl Mock { + fn new() -> Self { + Self::with_result(Poll::Pending) + } + + fn with_result(result: Poll<Result<(), ()>>) -> Self { + Self { + num_kills: 0, + num_polls: 0, + poll_result: result, + } + } + } + + impl Kill for Mock { + fn kill(&mut self) -> io::Result<()> { + self.num_kills += 1; + Ok(()) + } + } + + impl Future for Mock { + type Output = Result<(), ()>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = Pin::get_mut(self); + inner.num_polls += 1; + inner.poll_result + } + } + + #[test] + fn kills_on_drop_if_specified() { + let mut mock = Mock::new(); + + { + let guard = ChildDropGuard { + inner: &mut mock, + kill_on_drop: true, + }; + drop(guard); + } + + assert_eq!(1, mock.num_kills); + assert_eq!(0, mock.num_polls); + } + + #[test] + fn no_kill_on_drop_by_default() { + let mut mock = Mock::new(); + + { + let guard = ChildDropGuard { + inner: &mut mock, + kill_on_drop: false, + }; + drop(guard); + } + + assert_eq!(0, mock.num_kills); + assert_eq!(0, mock.num_polls); + } + + #[test] + fn no_kill_if_already_killed() { + let mut mock = Mock::new(); + + { + let mut guard = ChildDropGuard { + inner: &mut mock, + kill_on_drop: true, + }; + let _ = guard.kill(); + drop(guard); + } + + assert_eq!(1, mock.num_kills); + assert_eq!(0, mock.num_polls); + } + + #[test] + fn no_kill_if_reaped() { + let mut mock_pending = Mock::with_result(Poll::Pending); + let mut mock_reaped = Mock::with_result(Poll::Ready(Ok(()))); + let mut mock_err = Mock::with_result(Poll::Ready(Err(()))); + + let waker = futures::task::noop_waker(); + let mut context = Context::from_waker(&waker); + { + let mut guard = ChildDropGuard { + inner: &mut mock_pending, + kill_on_drop: true, + }; + let _ = guard.poll_unpin(&mut context); + + let mut guard = ChildDropGuard { + inner: &mut mock_reaped, + kill_on_drop: true, + }; + let _ = guard.poll_unpin(&mut context); + + let mut guard = ChildDropGuard { + inner: &mut mock_err, + kill_on_drop: true, + }; + let _ = guard.poll_unpin(&mut context); + } + + assert_eq!(1, mock_pending.num_kills); + assert_eq!(1, mock_pending.num_polls); + + assert_eq!(0, mock_reaped.num_kills); + assert_eq!(1, mock_reaped.num_polls); + + assert_eq!(1, mock_err.num_kills); + assert_eq!(1, mock_err.num_polls); + } +} diff --git a/third_party/rust/tokio/src/process/unix/driver.rs b/third_party/rust/tokio/src/process/unix/driver.rs new file mode 100644 index 0000000000..84dc8fbd02 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/driver.rs @@ -0,0 +1,58 @@ +#![cfg_attr(not(feature = "rt"), allow(dead_code))] + +//! Process driver. + +use crate::park::Park; +use crate::process::unix::GlobalOrphanQueue; +use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + +use std::io; +use std::time::Duration; + +/// Responsible for cleaning up orphaned child processes on Unix platforms. +#[derive(Debug)] +pub(crate) struct Driver { + park: SignalDriver, + signal_handle: SignalHandle, +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: SignalDriver) -> Self { + let signal_handle = park.handle(); + + Self { + park, + signal_handle, + } + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = <SignalDriver as Park>::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + GlobalOrphanQueue::reap_orphans(&self.signal_handle); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + GlobalOrphanQueue::reap_orphans(&self.signal_handle); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} diff --git a/third_party/rust/tokio/src/process/unix/mod.rs b/third_party/rust/tokio/src/process/unix/mod.rs new file mode 100644 index 0000000000..576fe6cb47 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/mod.rs @@ -0,0 +1,250 @@ +//! Unix handling of child processes. +//! +//! Right now the only "fancy" thing about this is how we implement the +//! `Future` implementation on `Child` to get the exit status. Unix offers +//! no way to register a child with epoll, and the only real way to get a +//! notification when a process exits is the SIGCHLD signal. +//! +//! Signal handling in general is *super* hairy and complicated, and it's even +//! more complicated here with the fact that signals are coalesced, so we may +//! not get a SIGCHLD-per-child. +//! +//! Our best approximation here is to check *all spawned processes* for all +//! SIGCHLD signals received. To do that we create a `Signal`, implemented in +//! the `tokio-net` crate, which is a stream over signals being received. +//! +//! Later when we poll the process's exit status we simply check to see if a +//! SIGCHLD has happened since we last checked, and while that returns "yes" we +//! keep trying. +//! +//! Note that this means that this isn't really scalable, but then again +//! processes in general aren't scalable (e.g. millions) so it shouldn't be that +//! bad in theory... + +pub(crate) mod driver; + +pub(crate) mod orphan; +use orphan::{OrphanQueue, OrphanQueueImpl, Wait}; + +mod reap; +use reap::Reaper; + +use crate::io::PollEvented; +use crate::process::kill::Kill; +use crate::process::SpawnedChild; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal, Signal, SignalKind}; + +use mio::event::Source; +use mio::unix::SourceFd; +use once_cell::sync::Lazy; +use std::fmt; +use std::fs::File; +use std::future::Future; +use std::io; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::pin::Pin; +use std::process::{Child as StdChild, ExitStatus, Stdio}; +use std::task::Context; +use std::task::Poll; + +impl Wait for StdChild { + fn id(&self) -> u32 { + self.id() + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + self.try_wait() + } +} + +impl Kill for StdChild { + fn kill(&mut self) -> io::Result<()> { + self.kill() + } +} + +static ORPHAN_QUEUE: Lazy<OrphanQueueImpl<StdChild>> = Lazy::new(OrphanQueueImpl::new); + +pub(crate) struct GlobalOrphanQueue; + +impl fmt::Debug for GlobalOrphanQueue { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + ORPHAN_QUEUE.fmt(fmt) + } +} + +impl GlobalOrphanQueue { + fn reap_orphans(handle: &SignalHandle) { + ORPHAN_QUEUE.reap_orphans(handle) + } +} + +impl OrphanQueue<StdChild> for GlobalOrphanQueue { + fn push_orphan(&self, orphan: StdChild) { + ORPHAN_QUEUE.push_orphan(orphan) + } +} + +#[must_use = "futures do nothing unless polled"] +pub(crate) struct Child { + inner: Reaper<StdChild, GlobalOrphanQueue, Signal>, +} + +impl fmt::Debug for Child { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Child") + .field("pid", &self.inner.id()) + .finish() + } +} + +pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> { + let mut child = cmd.spawn()?; + let stdin = child.stdin.take().map(stdio).transpose()?; + let stdout = child.stdout.take().map(stdio).transpose()?; + let stderr = child.stderr.take().map(stdio).transpose()?; + + let signal = signal(SignalKind::child())?; + + Ok(SpawnedChild { + child: Child { + inner: Reaper::new(child, GlobalOrphanQueue, signal), + }, + stdin, + stdout, + stderr, + }) +} + +impl Child { + pub(crate) fn id(&self) -> u32 { + self.inner.id() + } + + pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + self.inner.inner_mut().try_wait() + } +} + +impl Kill for Child { + fn kill(&mut self) -> io::Result<()> { + self.inner.kill() + } +} + +impl Future for Child { + type Output = io::Result<ExitStatus>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + Pin::new(&mut self.inner).poll(cx) + } +} + +#[derive(Debug)] +pub(crate) struct Pipe { + // Actually a pipe and not a File. However, we are reusing `File` to get + // close on drop. This is a similar trick as `mio`. + fd: File, +} + +impl<T: IntoRawFd> From<T> for Pipe { + fn from(fd: T) -> Self { + let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) }; + Self { fd } + } +} + +impl<'a> io::Read for &'a Pipe { + fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> { + (&self.fd).read(bytes) + } +} + +impl<'a> io::Write for &'a Pipe { + fn write(&mut self, bytes: &[u8]) -> io::Result<usize> { + (&self.fd).write(bytes) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.fd).flush() + } +} + +impl AsRawFd for Pipe { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } +} + +pub(crate) fn convert_to_stdio(io: PollEvented<Pipe>) -> io::Result<Stdio> { + let mut fd = io.into_inner()?.fd; + + // Ensure that the fd to be inherited is set to *blocking* mode, as this + // is the default that virtually all programs expect to have. Those + // programs that know how to work with nonblocking stdio will know how to + // change it to nonblocking mode. + set_nonblocking(&mut fd, false)?; + + Ok(Stdio::from(fd)) +} + +impl Source for Pipe { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).register(registry, token, interest) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interest: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).reregister(registry, token, interest) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.as_raw_fd()).deregister(registry) + } +} + +pub(crate) type ChildStdio = PollEvented<Pipe>; + +fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> { + unsafe { + let fd = fd.as_raw_fd(); + let previous = libc::fcntl(fd, libc::F_GETFL); + if previous == -1 { + return Err(io::Error::last_os_error()); + } + + let new = if nonblocking { + previous | libc::O_NONBLOCK + } else { + previous & !libc::O_NONBLOCK + }; + + let r = libc::fcntl(fd, libc::F_SETFL, new); + if r == -1 { + return Err(io::Error::last_os_error()); + } + } + + Ok(()) +} + +pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<Pipe>> +where + T: IntoRawFd, +{ + // Set the fd to nonblocking before we pass it to the event loop + let mut pipe = Pipe::from(io); + set_nonblocking(&mut pipe, true)?; + + PollEvented::new(pipe) +} diff --git a/third_party/rust/tokio/src/process/unix/orphan.rs b/third_party/rust/tokio/src/process/unix/orphan.rs new file mode 100644 index 0000000000..0e52530c37 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/orphan.rs @@ -0,0 +1,321 @@ +use crate::loom::sync::{Mutex, MutexGuard}; +use crate::signal::unix::driver::Handle as SignalHandle; +use crate::signal::unix::{signal_with_handle, SignalKind}; +use crate::sync::watch; +use std::io; +use std::process::ExitStatus; + +/// An interface for waiting on a process to exit. +pub(crate) trait Wait { + /// Get the identifier for this process or diagnostics. + fn id(&self) -> u32; + /// Try waiting for a process to exit in a non-blocking manner. + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>; +} + +impl<T: Wait> Wait for &mut T { + fn id(&self) -> u32 { + (**self).id() + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + (**self).try_wait() + } +} + +/// An interface for queueing up an orphaned process so that it can be reaped. +pub(crate) trait OrphanQueue<T> { + /// Adds an orphan to the queue. + fn push_orphan(&self, orphan: T); +} + +impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O { + fn push_orphan(&self, orphan: T) { + (**self).push_orphan(orphan); + } +} + +/// An implementation of `OrphanQueue`. +#[derive(Debug)] +pub(crate) struct OrphanQueueImpl<T> { + sigchild: Mutex<Option<watch::Receiver<()>>>, + queue: Mutex<Vec<T>>, +} + +impl<T> OrphanQueueImpl<T> { + pub(crate) fn new() -> Self { + Self { + sigchild: Mutex::new(None), + queue: Mutex::new(Vec::new()), + } + } + + #[cfg(test)] + fn len(&self) -> usize { + self.queue.lock().len() + } + + pub(crate) fn push_orphan(&self, orphan: T) + where + T: Wait, + { + self.queue.lock().push(orphan) + } + + /// Attempts to reap every process in the queue, ignoring any errors and + /// enqueueing any orphans which have not yet exited. + pub(crate) fn reap_orphans(&self, handle: &SignalHandle) + where + T: Wait, + { + // If someone else is holding the lock, they will be responsible for draining + // the queue as necessary, so we can safely bail if that happens + if let Some(mut sigchild_guard) = self.sigchild.try_lock() { + match &mut *sigchild_guard { + Some(sigchild) => { + if sigchild.try_has_changed().and_then(Result::ok).is_some() { + drain_orphan_queue(self.queue.lock()); + } + } + None => { + let queue = self.queue.lock(); + + // Be lazy and only initialize the SIGCHLD listener if there + // are any orphaned processes in the queue. + if !queue.is_empty() { + // An errors shouldn't really happen here, but if it does it + // means that the signal driver isn't running, in + // which case there isn't anything we can + // register/initialize here, so we can try again later + if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) { + *sigchild_guard = Some(sigchild); + drain_orphan_queue(queue); + } + } + } + } + } + } +} + +fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) +where + T: Wait, +{ + for i in (0..queue.len()).rev() { + match queue[i].try_wait() { + Ok(None) => {} + Ok(Some(_)) | Err(_) => { + // The stdlib handles interruption errors (EINTR) when polling a child process. + // All other errors represent invalid inputs or pids that have already been + // reaped, so we can drop the orphan in case an error is raised. + queue.swap_remove(i); + } + } + } + + drop(queue); +} + +#[cfg(all(test, not(loom)))] +pub(crate) mod test { + use super::*; + use crate::io::driver::Driver as IoDriver; + use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; + use crate::sync::watch; + use std::cell::{Cell, RefCell}; + use std::io; + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + use std::rc::Rc; + + pub(crate) struct MockQueue<W> { + pub(crate) all_enqueued: RefCell<Vec<W>>, + } + + impl<W> MockQueue<W> { + pub(crate) fn new() -> Self { + Self { + all_enqueued: RefCell::new(Vec::new()), + } + } + } + + impl<W> OrphanQueue<W> for MockQueue<W> { + fn push_orphan(&self, orphan: W) { + self.all_enqueued.borrow_mut().push(orphan); + } + } + + struct MockWait { + total_waits: Rc<Cell<usize>>, + num_wait_until_status: usize, + return_err: bool, + } + + impl MockWait { + fn new(num_wait_until_status: usize) -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status, + return_err: false, + } + } + + fn with_err() -> Self { + Self { + total_waits: Rc::new(Cell::new(0)), + num_wait_until_status: 0, + return_err: true, + } + } + } + + impl Wait for MockWait { + fn id(&self) -> u32 { + 42 + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + let waits = self.total_waits.get(); + + let ret = if self.num_wait_until_status == waits { + if self.return_err { + Ok(Some(ExitStatus::from_raw(0))) + } else { + Err(io::Error::new(io::ErrorKind::Other, "mock err")) + } + } else { + Ok(None) + }; + + self.total_waits.set(waits + 1); + ret + } + } + + #[test] + fn drain_attempts_a_single_reap_of_all_queued_orphans() { + let first_orphan = MockWait::new(0); + let second_orphan = MockWait::new(1); + let third_orphan = MockWait::new(2); + let fourth_orphan = MockWait::with_err(); + + let first_waits = first_orphan.total_waits.clone(); + let second_waits = second_orphan.total_waits.clone(); + let third_waits = third_orphan.total_waits.clone(); + let fourth_waits = fourth_orphan.total_waits.clone(); + + let orphanage = OrphanQueueImpl::new(); + orphanage.push_orphan(first_orphan); + orphanage.push_orphan(third_orphan); + orphanage.push_orphan(second_orphan); + orphanage.push_orphan(fourth_orphan); + + assert_eq!(orphanage.len(), 4); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 2); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 1); + assert_eq!(third_waits.get(), 1); + assert_eq!(fourth_waits.get(), 1); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 1); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 2); + assert_eq!(fourth_waits.get(), 1); + + drain_orphan_queue(orphanage.queue.lock()); + assert_eq!(orphanage.len(), 0); + assert_eq!(first_waits.get(), 1); + assert_eq!(second_waits.get(), 2); + assert_eq!(third_waits.get(), 3); + assert_eq!(fourth_waits.get(), 1); + + // Safe to reap when empty + drain_orphan_queue(orphanage.queue.lock()); + } + + #[test] + fn no_reap_if_no_signal_received() { + let (tx, rx) = watch::channel(()); + + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + *orphanage.sigchild.lock() = Some(rx); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + tx.send(()).unwrap(); + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 1); + } + + #[test] + fn no_reap_if_signal_lock_held() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + let signal_guard = orphanage.sigchild.lock(); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert_eq!(waits.get(), 0); + + drop(signal_guard); + } + + #[cfg_attr(miri, ignore)] // Miri does not support epoll. + #[test] + fn does_not_register_signal_if_queue_empty() { + let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap(); + let handle = signal_driver.handle(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); // Sanity + + // No register when queue empty + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_some()); + assert_eq!(waits.get(), 1); // Eager reap when registering listener + } + + #[test] + fn does_nothing_if_signal_could_not_be_registered() { + let handle = SignalHandle::default(); + + let orphanage = OrphanQueueImpl::new(); + assert!(orphanage.sigchild.lock().is_none()); + + let orphan = MockWait::new(2); + let waits = orphan.total_waits.clone(); + orphanage.push_orphan(orphan); + + // Signal handler has "gone away", nothing to register or reap + orphanage.reap_orphans(&handle); + assert!(orphanage.sigchild.lock().is_none()); + assert_eq!(waits.get(), 0); + } +} diff --git a/third_party/rust/tokio/src/process/unix/reap.rs b/third_party/rust/tokio/src/process/unix/reap.rs new file mode 100644 index 0000000000..f7f4d3cc70 --- /dev/null +++ b/third_party/rust/tokio/src/process/unix/reap.rs @@ -0,0 +1,298 @@ +use crate::process::imp::orphan::{OrphanQueue, Wait}; +use crate::process::kill::Kill; +use crate::signal::unix::InternalStream; + +use std::future::Future; +use std::io; +use std::ops::Deref; +use std::pin::Pin; +use std::process::ExitStatus; +use std::task::Context; +use std::task::Poll; + +/// Orchestrates between registering interest for receiving signals when a +/// child process has exited, and attempting to poll for process completion. +#[derive(Debug)] +pub(crate) struct Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + inner: Option<W>, + orphan_queue: Q, + signal: S, +} + +impl<W, Q, S> Deref for Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + type Target = W; + + fn deref(&self) -> &Self::Target { + self.inner() + } +} + +impl<W, Q, S> Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { + Self { + inner: Some(inner), + orphan_queue, + signal, + } + } + + fn inner(&self) -> &W { + self.inner.as_ref().expect("inner has gone away") + } + + pub(crate) fn inner_mut(&mut self) -> &mut W { + self.inner.as_mut().expect("inner has gone away") + } +} + +impl<W, Q, S> Future for Reaper<W, Q, S> +where + W: Wait + Unpin, + Q: OrphanQueue<W> + Unpin, + S: InternalStream + Unpin, +{ + type Output = io::Result<ExitStatus>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + // If the child hasn't exited yet, then it's our responsibility to + // ensure the current task gets notified when it might be able to + // make progress. We can use the delivery of a SIGCHLD signal as a + // sign that we can potentially make progress. + // + // However, we will register for a notification on the next signal + // BEFORE we poll the child. Otherwise it is possible that the child + // can exit and the signal can arrive after we last polled the child, + // but before we've registered for a notification on the next signal + // (this can cause a deadlock if there are no more spawned children + // which can generate a different signal for us). A side effect of + // pre-registering for signal notifications is that when the child + // exits, we will have already registered for an additional + // notification we don't need to consume. If another signal arrives, + // this future's task will be notified/woken up again. Since the + // futures model allows for spurious wake ups this extra wakeup + // should not cause significant issues with parent futures. + let registered_interest = self.signal.poll_recv(cx).is_pending(); + + if let Some(status) = self.inner_mut().try_wait()? { + return Poll::Ready(Ok(status)); + } + + // If our attempt to poll for the next signal was not ready, then + // we've arranged for our task to get notified and we can bail out. + if registered_interest { + return Poll::Pending; + } else { + // Otherwise, if the signal stream delivered a signal to us, we + // won't get notified at the next signal, so we'll loop and try + // again. + continue; + } + } + } +} + +impl<W, Q, S> Kill for Reaper<W, Q, S> +where + W: Kill + Wait, + Q: OrphanQueue<W>, +{ + fn kill(&mut self) -> io::Result<()> { + self.inner_mut().kill() + } +} + +impl<W, Q, S> Drop for Reaper<W, Q, S> +where + W: Wait, + Q: OrphanQueue<W>, +{ + fn drop(&mut self) { + if let Ok(Some(_)) = self.inner_mut().try_wait() { + return; + } + + let orphan = self.inner.take().unwrap(); + self.orphan_queue.push_orphan(orphan); + } +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + use crate::process::unix::orphan::test::MockQueue; + use futures::future::FutureExt; + use std::os::unix::process::ExitStatusExt; + use std::process::ExitStatus; + use std::task::Context; + use std::task::Poll; + + #[derive(Debug)] + struct MockWait { + total_kills: usize, + total_waits: usize, + num_wait_until_status: usize, + status: ExitStatus, + } + + impl MockWait { + fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { + Self { + total_kills: 0, + total_waits: 0, + num_wait_until_status, + status, + } + } + } + + impl Wait for MockWait { + fn id(&self) -> u32 { + 0 + } + + fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + let ret = if self.num_wait_until_status == self.total_waits { + Some(self.status) + } else { + None + }; + + self.total_waits += 1; + Ok(ret) + } + } + + impl Kill for MockWait { + fn kill(&mut self) -> io::Result<()> { + self.total_kills += 1; + Ok(()) + } + } + + struct MockStream { + total_polls: usize, + values: Vec<Option<()>>, + } + + impl MockStream { + fn new(values: Vec<Option<()>>) -> Self { + Self { + total_polls: 0, + values, + } + } + } + + impl InternalStream for MockStream { + fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { + self.total_polls += 1; + match self.values.remove(0) { + Some(()) => Poll::Ready(Some(())), + None => Poll::Pending, + } + } + } + + #[test] + fn reaper() { + let exit = ExitStatus::from_raw(0); + let mock = MockWait::new(exit, 3); + let mut grim = Reaper::new( + mock, + MockQueue::new(), + MockStream::new(vec![None, Some(()), None, None, None]), + ); + + let waker = futures::task::noop_waker(); + let mut context = Context::from_waker(&waker); + + // Not yet exited, interest registered + assert!(grim.poll_unpin(&mut context).is_pending()); + assert_eq!(1, grim.signal.total_polls); + assert_eq!(1, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + + // Not yet exited, couldn't register interest the first time + // but managed to register interest the second time around + assert!(grim.poll_unpin(&mut context).is_pending()); + assert_eq!(3, grim.signal.total_polls); + assert_eq!(3, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + + // Exited + if let Poll::Ready(r) = grim.poll_unpin(&mut context) { + assert!(r.is_ok()); + let exit_code = r.unwrap(); + assert_eq!(exit_code, exit); + } else { + unreachable!(); + } + assert_eq!(4, grim.signal.total_polls); + assert_eq!(4, grim.total_waits); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn kill() { + let exit = ExitStatus::from_raw(0); + let mut grim = Reaper::new( + MockWait::new(exit, 0), + MockQueue::new(), + MockStream::new(vec![None]), + ); + + grim.kill().unwrap(); + assert_eq!(1, grim.total_kills); + assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); + } + + #[test] + fn drop_reaps_if_possible() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 0); + + { + let queue = MockQueue::new(); + + let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); + + drop(grim); + + assert!(queue.all_enqueued.borrow().is_empty()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); + } + + #[test] + fn drop_enqueues_orphan_if_wait_fails() { + let exit = ExitStatus::from_raw(0); + let mut mock = MockWait::new(exit, 2); + + { + let queue = MockQueue::<&mut MockWait>::new(); + let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); + drop(grim); + + assert_eq!(1, queue.all_enqueued.borrow().len()); + } + + assert_eq!(1, mock.total_waits); + assert_eq!(0, mock.total_kills); + } +} diff --git a/third_party/rust/tokio/src/process/windows.rs b/third_party/rust/tokio/src/process/windows.rs new file mode 100644 index 0000000000..136d5b0cab --- /dev/null +++ b/third_party/rust/tokio/src/process/windows.rs @@ -0,0 +1,205 @@ +//! Windows asynchronous process handling. +//! +//! Like with Unix we don't actually have a way of registering a process with an +//! IOCP object. As a result we similarly need another mechanism for getting a +//! signal when a process has exited. For now this is implemented with the +//! `RegisterWaitForSingleObject` function in the kernel32.dll. +//! +//! This strategy is the same that libuv takes and essentially just queues up a +//! wait for the process in a kernel32-specific thread pool. Once the object is +//! notified (e.g. the process exits) then we have a callback that basically +//! just completes a `Oneshot`. +//! +//! The `poll_exit` implementation will attempt to wait for the process in a +//! nonblocking fashion, but failing that it'll fire off a +//! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot +//! from then on out. + +use crate::io::PollEvented; +use crate::process::kill::Kill; +use crate::process::SpawnedChild; +use crate::sync::oneshot; + +use mio::windows::NamedPipe; +use std::fmt; +use std::future::Future; +use std::io; +use std::os::windows::prelude::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; +use std::pin::Pin; +use std::process::Stdio; +use std::process::{Child as StdChild, Command as StdCommand, ExitStatus}; +use std::ptr; +use std::task::Context; +use std::task::Poll; +use winapi::shared::minwindef::{DWORD, FALSE}; +use winapi::um::handleapi::{DuplicateHandle, INVALID_HANDLE_VALUE}; +use winapi::um::processthreadsapi::GetCurrentProcess; +use winapi::um::threadpoollegacyapiset::UnregisterWaitEx; +use winapi::um::winbase::{RegisterWaitForSingleObject, INFINITE}; +use winapi::um::winnt::{ + BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, PVOID, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, +}; + +#[must_use = "futures do nothing unless polled"] +pub(crate) struct Child { + child: StdChild, + waiting: Option<Waiting>, +} + +impl fmt::Debug for Child { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Child") + .field("pid", &self.id()) + .field("child", &self.child) + .field("waiting", &"..") + .finish() + } +} + +struct Waiting { + rx: oneshot::Receiver<()>, + wait_object: HANDLE, + tx: *mut Option<oneshot::Sender<()>>, +} + +unsafe impl Sync for Waiting {} +unsafe impl Send for Waiting {} + +pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild> { + let mut child = cmd.spawn()?; + let stdin = child.stdin.take().map(stdio).transpose()?; + let stdout = child.stdout.take().map(stdio).transpose()?; + let stderr = child.stderr.take().map(stdio).transpose()?; + + Ok(SpawnedChild { + child: Child { + child, + waiting: None, + }, + stdin, + stdout, + stderr, + }) +} + +impl Child { + pub(crate) fn id(&self) -> u32 { + self.child.id() + } + + pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { + self.child.try_wait() + } +} + +impl Kill for Child { + fn kill(&mut self) -> io::Result<()> { + self.child.kill() + } +} + +impl Future for Child { + type Output = io::Result<ExitStatus>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let inner = Pin::get_mut(self); + loop { + if let Some(ref mut w) = inner.waiting { + match Pin::new(&mut w.rx).poll(cx) { + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(_)) => panic!("should not be canceled"), + Poll::Pending => return Poll::Pending, + } + let status = inner.try_wait()?.expect("not ready yet"); + return Poll::Ready(Ok(status)); + } + + if let Some(e) = inner.try_wait()? { + return Poll::Ready(Ok(e)); + } + let (tx, rx) = oneshot::channel(); + let ptr = Box::into_raw(Box::new(Some(tx))); + let mut wait_object = ptr::null_mut(); + let rc = unsafe { + RegisterWaitForSingleObject( + &mut wait_object, + inner.child.as_raw_handle(), + Some(callback), + ptr as *mut _, + INFINITE, + WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, + ) + }; + if rc == 0 { + let err = io::Error::last_os_error(); + drop(unsafe { Box::from_raw(ptr) }); + return Poll::Ready(Err(err)); + } + inner.waiting = Some(Waiting { + rx, + wait_object, + tx: ptr, + }); + } + } +} + +impl AsRawHandle for Child { + fn as_raw_handle(&self) -> RawHandle { + self.child.as_raw_handle() + } +} + +impl Drop for Waiting { + fn drop(&mut self) { + unsafe { + let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE); + if rc == 0 { + panic!("failed to unregister: {}", io::Error::last_os_error()); + } + drop(Box::from_raw(self.tx)); + } + } +} + +unsafe extern "system" fn callback(ptr: PVOID, _timer_fired: BOOLEAN) { + let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>); + let _ = complete.take().unwrap().send(()); +} + +pub(crate) type ChildStdio = PollEvented<NamedPipe>; + +pub(super) fn stdio<T>(io: T) -> io::Result<PollEvented<NamedPipe>> +where + T: IntoRawHandle, +{ + let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) }; + PollEvented::new(pipe) +} + +pub(crate) fn convert_to_stdio(io: PollEvented<NamedPipe>) -> io::Result<Stdio> { + let named_pipe = io.into_inner()?; + + // Mio does not implement `IntoRawHandle` for `NamedPipe`, so we'll manually + // duplicate the handle here... + unsafe { + let mut dup_handle = INVALID_HANDLE_VALUE; + let cur_proc = GetCurrentProcess(); + + let status = DuplicateHandle( + cur_proc, + named_pipe.as_raw_handle(), + cur_proc, + &mut dup_handle, + 0 as DWORD, + FALSE, + DUPLICATE_SAME_ACCESS, + ); + + if status == 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Stdio::from_raw_handle(dup_handle)) + } +} |