summaryrefslogtreecommitdiffstats
path: root/vendor/jobserver/src
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/jobserver/src')
-rw-r--r--vendor/jobserver/src/error.rs84
-rw-r--r--vendor/jobserver/src/lib.rs1140
-rw-r--r--vendor/jobserver/src/unix.rs908
-rw-r--r--vendor/jobserver/src/wasm.rs191
-rw-r--r--vendor/jobserver/src/windows.rs536
5 files changed, 1523 insertions, 1336 deletions
diff --git a/vendor/jobserver/src/error.rs b/vendor/jobserver/src/error.rs
new file mode 100644
index 000000000..9f95f18b8
--- /dev/null
+++ b/vendor/jobserver/src/error.rs
@@ -0,0 +1,84 @@
+#[cfg(unix)]
+type RawFd = std::os::fd::RawFd;
+#[cfg(not(unix))]
+type RawFd = std::convert::Infallible;
+
+/// Error type for `from_env_ext` function.
+#[derive(Debug)]
+pub struct FromEnvError {
+ pub(crate) inner: FromEnvErrorInner,
+}
+
+/// Kind of an error returned from `from_env_ext` function.
+#[derive(Debug)]
+#[non_exhaustive]
+pub enum FromEnvErrorKind {
+ /// There is no environment variable that describes jobserver to inherit.
+ NoEnvVar,
+ /// There is no jobserver in the environment variable.
+ /// Variables associated with Make can be used for passing data other than jobserver info.
+ NoJobserver,
+ /// Cannot parse jobserver environment variable value, incorrect format.
+ CannotParse,
+ /// Cannot open path or name from the jobserver environment variable value.
+ CannotOpenPath,
+ /// Cannot open file descriptor from the jobserver environment variable value.
+ CannotOpenFd,
+ /// File descriptor from the jobserver environment variable value is not a pipe.
+ NotAPipe,
+ /// Jobserver inheritance is not supported on this platform.
+ Unsupported,
+}
+
+impl FromEnvError {
+ /// Get the error kind.
+ pub fn kind(&self) -> FromEnvErrorKind {
+ match self.inner {
+ FromEnvErrorInner::NoEnvVar => FromEnvErrorKind::NoEnvVar,
+ FromEnvErrorInner::NoJobserver => FromEnvErrorKind::NoJobserver,
+ FromEnvErrorInner::CannotParse(_) => FromEnvErrorKind::CannotParse,
+ FromEnvErrorInner::CannotOpenPath(..) => FromEnvErrorKind::CannotOpenPath,
+ FromEnvErrorInner::CannotOpenFd(..) => FromEnvErrorKind::CannotOpenFd,
+ FromEnvErrorInner::NotAPipe(..) => FromEnvErrorKind::NotAPipe,
+ FromEnvErrorInner::Unsupported => FromEnvErrorKind::Unsupported,
+ }
+ }
+}
+
+impl std::fmt::Display for FromEnvError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match &self.inner {
+ FromEnvErrorInner::NoEnvVar => write!(f, "there is no environment variable that describes jobserver to inherit"),
+ FromEnvErrorInner::NoJobserver => write!(f, "there is no `--jobserver-fds=` or `--jobserver-auth=` in the environment variable"),
+ FromEnvErrorInner::CannotParse(s) => write!(f, "cannot parse jobserver environment variable value: {s}"),
+ FromEnvErrorInner::CannotOpenPath(s, err) => write!(f, "cannot open path or name {s} from the jobserver environment variable value: {err}"),
+ FromEnvErrorInner::CannotOpenFd(fd, err) => write!(f, "cannot open file descriptor {fd} from the jobserver environment variable value: {err}"),
+ FromEnvErrorInner::NotAPipe(fd, None) => write!(f, "file descriptor {fd} from the jobserver environment variable value is not a pipe"),
+ FromEnvErrorInner::NotAPipe(fd, Some(err)) => write!(f, "file descriptor {fd} from the jobserver environment variable value is not a pipe: {err}"),
+ FromEnvErrorInner::Unsupported => write!(f, "jobserver inheritance is not supported on this platform"),
+ }
+ }
+}
+impl std::error::Error for FromEnvError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match &self.inner {
+ FromEnvErrorInner::CannotOpenPath(_, err) => Some(err),
+ FromEnvErrorInner::NotAPipe(_, Some(err)) | FromEnvErrorInner::CannotOpenFd(_, err) => {
+ Some(err)
+ }
+ _ => None,
+ }
+ }
+}
+
+#[allow(dead_code)]
+#[derive(Debug)]
+pub(crate) enum FromEnvErrorInner {
+ NoEnvVar,
+ NoJobserver,
+ CannotParse(String),
+ CannotOpenPath(String, std::io::Error),
+ CannotOpenFd(RawFd, std::io::Error),
+ NotAPipe(RawFd, Option<std::io::Error>),
+ Unsupported,
+}
diff --git a/vendor/jobserver/src/lib.rs b/vendor/jobserver/src/lib.rs
index cd0cdd749..bfa7980c4 100644
--- a/vendor/jobserver/src/lib.rs
+++ b/vendor/jobserver/src/lib.rs
@@ -1,544 +1,596 @@
-//! An implementation of the GNU make jobserver.
-//!
-//! This crate is an implementation, in Rust, of the GNU `make` jobserver for
-//! CLI tools that are interoperating with make or otherwise require some form
-//! of parallelism limiting across process boundaries. This was originally
-//! written for usage in Cargo to both (a) work when `cargo` is invoked from
-//! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
-//! scripts, exporting a jobserver implementation for `make` processes to
-//! transitively use.
-//!
-//! The jobserver implementation can be found in [detail online][docs] but
-//! basically boils down to a cross-process semaphore. On Unix this is
-//! implemented with the `pipe` syscall and read/write ends of a pipe and on
-//! Windows this is implemented literally with IPC semaphores. Starting from
-//! GNU `make` version 4.4, named pipe becomes the default way in communication
-//! on Unix. This crate also supports that feature in the sense of inheriting
-//! and forwarding the correct environment.
-//!
-//! The jobserver protocol in `make` also dictates when tokens are acquired to
-//! run child work, and clients using this crate should take care to implement
-//! such details to ensure correct interoperation with `make` itself.
-//!
-//! ## Examples
-//!
-//! Connect to a jobserver that was set up by `make` or a different process:
-//!
-//! ```no_run
-//! use jobserver::Client;
-//!
-//! // See API documentation for why this is `unsafe`
-//! let client = match unsafe { Client::from_env() } {
-//! Some(client) => client,
-//! None => panic!("client not configured"),
-//! };
-//! ```
-//!
-//! Acquire and release token from a jobserver:
-//!
-//! ```no_run
-//! use jobserver::Client;
-//!
-//! let client = unsafe { Client::from_env().unwrap() };
-//! let token = client.acquire().unwrap(); // blocks until it is available
-//! drop(token); // releases the token when the work is done
-//! ```
-//!
-//! Create a new jobserver and configure a child process to have access:
-//!
-//! ```
-//! use std::process::Command;
-//! use jobserver::Client;
-//!
-//! let client = Client::new(4).expect("failed to create jobserver");
-//! let mut cmd = Command::new("make");
-//! client.configure(&mut cmd);
-//! ```
-//!
-//! ## Caveats
-//!
-//! This crate makes no attempt to release tokens back to a jobserver on
-//! abnormal exit of a process. If a process which acquires a token is killed
-//! with ctrl-c or some similar signal then tokens will not be released and the
-//! jobserver may be in a corrupt state.
-//!
-//! Note that this is typically ok as ctrl-c means that an entire build process
-//! is being torn down, but it's worth being aware of at least!
-//!
-//! ## Windows caveats
-//!
-//! There appear to be two implementations of `make` on Windows. On MSYS2 one
-//! typically comes as `mingw32-make` and the other as `make` itself. I'm not
-//! personally too familiar with what's going on here, but for jobserver-related
-//! information the `mingw32-make` implementation uses Windows semaphores
-//! whereas the `make` program does not. The `make` program appears to use file
-//! descriptors and I'm not really sure how it works, so this crate is not
-//! compatible with `make` on Windows. It is, however, compatible with
-//! `mingw32-make`.
-//!
-//! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
-
-#![deny(missing_docs, missing_debug_implementations)]
-#![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
-
-use std::env;
-use std::io;
-use std::process::Command;
-use std::sync::{Arc, Condvar, Mutex, MutexGuard};
-
-#[cfg(unix)]
-#[path = "unix.rs"]
-mod imp;
-#[cfg(windows)]
-#[path = "windows.rs"]
-mod imp;
-#[cfg(not(any(unix, windows)))]
-#[path = "wasm.rs"]
-mod imp;
-
-/// A client of a jobserver
-///
-/// This structure is the main type exposed by this library, and is where
-/// interaction to a jobserver is configured through. Clients are either created
-/// from scratch in which case the internal semphore is initialied on the spot,
-/// or a client is created from the environment to connect to a jobserver
-/// already created.
-///
-/// Some usage examples can be found in the crate documentation for using a
-/// client.
-///
-/// Note that a `Client` implements the `Clone` trait, and all instances of a
-/// `Client` refer to the same jobserver instance.
-#[derive(Clone, Debug)]
-pub struct Client {
- inner: Arc<imp::Client>,
-}
-
-/// An acquired token from a jobserver.
-///
-/// This token will be released back to the jobserver when it is dropped and
-/// otherwise represents the ability to spawn off another thread of work.
-#[derive(Debug)]
-pub struct Acquired {
- client: Arc<imp::Client>,
- data: imp::Acquired,
- disabled: bool,
-}
-
-impl Acquired {
- /// This drops the `Acquired` token without releasing the associated token.
- ///
- /// This is not generally useful, but can be helpful if you do not have the
- /// ability to store an Acquired token but need to not yet release it.
- ///
- /// You'll typically want to follow this up with a call to `release_raw` or
- /// similar to actually release the token later on.
- pub fn drop_without_releasing(mut self) {
- self.disabled = true;
- }
-}
-
-#[derive(Default, Debug)]
-struct HelperState {
- lock: Mutex<HelperInner>,
- cvar: Condvar,
-}
-
-#[derive(Default, Debug)]
-struct HelperInner {
- requests: usize,
- producer_done: bool,
- consumer_done: bool,
-}
-
-impl Client {
- /// Creates a new jobserver initialized with the given parallelism limit.
- ///
- /// A client to the jobserver created will be returned. This client will
- /// allow at most `limit` tokens to be acquired from it in parallel. More
- /// calls to `acquire` will cause the calling thread to block.
- ///
- /// Note that the created `Client` is not automatically inherited into
- /// spawned child processes from this program. Manual usage of the
- /// `configure` function is required for a child process to have access to a
- /// job server.
- ///
- /// # Examples
- ///
- /// ```
- /// use jobserver::Client;
- ///
- /// let client = Client::new(4).expect("failed to create jobserver");
- /// ```
- ///
- /// # Errors
- ///
- /// Returns an error if any I/O error happens when attempting to create the
- /// jobserver client.
- pub fn new(limit: usize) -> io::Result<Client> {
- Ok(Client {
- inner: Arc::new(imp::Client::new(limit)?),
- })
- }
-
- /// Attempts to connect to the jobserver specified in this process's
- /// environment.
- ///
- /// When the a `make` executable calls a child process it will configure the
- /// environment of the child to ensure that it has handles to the jobserver
- /// it's passing down. This function will attempt to look for these details
- /// and connect to the jobserver.
- ///
- /// Note that the created `Client` is not automatically inherited into
- /// spawned child processes from this program. Manual usage of the
- /// `configure` function is required for a child process to have access to a
- /// job server.
- ///
- /// # Return value
- ///
- /// If a jobserver was found in the environment and it looks correct then
- /// `Some` of the connected client will be returned. If no jobserver was
- /// found then `None` will be returned.
- ///
- /// Note that on Unix the `Client` returned **takes ownership of the file
- /// descriptors specified in the environment**. Jobservers on Unix are
- /// implemented with `pipe` file descriptors, and they're inherited from
- /// parent processes. This `Client` returned takes ownership of the file
- /// descriptors for this process and will close the file descriptors after
- /// this value is dropped.
- ///
- /// Additionally on Unix this function will configure the file descriptors
- /// with `CLOEXEC` so they're not automatically inherited by spawned
- /// children.
- ///
- /// # Safety
- ///
- /// This function is `unsafe` to call on Unix specifically as it
- /// transitively requires usage of the `from_raw_fd` function, which is
- /// itself unsafe in some circumstances.
- ///
- /// It's recommended to call this function very early in the lifetime of a
- /// program before any other file descriptors are opened. That way you can
- /// make sure to take ownership properly of the file descriptors passed
- /// down, if any.
- ///
- /// It's generally unsafe to call this function twice in a program if the
- /// previous invocation returned `Some`.
- ///
- /// Note, though, that on Windows it should be safe to call this function
- /// any number of times.
- pub unsafe fn from_env() -> Option<Client> {
- let var = match env::var("CARGO_MAKEFLAGS")
- .or_else(|_| env::var("MAKEFLAGS"))
- .or_else(|_| env::var("MFLAGS"))
- {
- Ok(s) => s,
- Err(_) => return None,
- };
- let mut arg = "--jobserver-fds=";
- let pos = match var.find(arg) {
- Some(i) => i,
- None => {
- arg = "--jobserver-auth=";
- match var.find(arg) {
- Some(i) => i,
- None => return None,
- }
- }
- };
-
- let s = var[pos + arg.len()..].split(' ').next().unwrap();
- imp::Client::open(s).map(|c| Client { inner: Arc::new(c) })
- }
-
- /// Acquires a token from this jobserver client.
- ///
- /// This function will block the calling thread until a new token can be
- /// acquired from the jobserver.
- ///
- /// # Return value
- ///
- /// On successful acquisition of a token an instance of `Acquired` is
- /// returned. This structure, when dropped, will release the token back to
- /// the jobserver. It's recommended to avoid leaking this value.
- ///
- /// # Errors
- ///
- /// If an I/O error happens while acquiring a token then this function will
- /// return immediately with the error. If an error is returned then a token
- /// was not acquired.
- pub fn acquire(&self) -> io::Result<Acquired> {
- let data = self.inner.acquire()?;
- Ok(Acquired {
- client: self.inner.clone(),
- data,
- disabled: false,
- })
- }
-
- /// Returns amount of tokens in the read-side pipe.
- ///
- /// # Return value
- ///
- /// Number of bytes available to be read from the jobserver pipe
- ///
- /// # Errors
- ///
- /// Underlying errors from the ioctl will be passed up.
- pub fn available(&self) -> io::Result<usize> {
- self.inner.available()
- }
-
- /// Configures a child process to have access to this client's jobserver as
- /// well.
- ///
- /// This function is required to be called to ensure that a jobserver is
- /// properly inherited to a child process. If this function is *not* called
- /// then this `Client` will not be accessible in the child process. In other
- /// words, if not called, then `Client::from_env` will return `None` in the
- /// child process (or the equivalent of `Child::from_env` that `make` uses).
- ///
- /// ## Platform-specific behavior
- ///
- /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
- /// variables for the child process, and on Unix this will also allow the
- /// two file descriptors for this client to be inherited to the child.
- ///
- /// On platforms other than Unix and Windows this panics.
- pub fn configure(&self, cmd: &mut Command) {
- cmd.env("CARGO_MAKEFLAGS", &self.mflags_env());
- self.inner.configure(cmd);
- }
-
- /// Configures a child process to have access to this client's jobserver as
- /// well.
- ///
- /// This function is required to be called to ensure that a jobserver is
- /// properly inherited to a child process. If this function is *not* called
- /// then this `Client` will not be accessible in the child process. In other
- /// words, if not called, then `Client::from_env` will return `None` in the
- /// child process (or the equivalent of `Child::from_env` that `make` uses).
- ///
- /// ## Platform-specific behavior
- ///
- /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS`,
- /// `MAKEFLAGS` and `MFLAGS` environment variables for the child process,
- /// and on Unix this will also allow the two file descriptors for
- /// this client to be inherited to the child.
- ///
- /// On platforms other than Unix and Windows this panics.
- pub fn configure_make(&self, cmd: &mut Command) {
- let value = self.mflags_env();
- cmd.env("CARGO_MAKEFLAGS", &value);
- cmd.env("MAKEFLAGS", &value);
- cmd.env("MFLAGS", &value);
- self.inner.configure(cmd);
- }
-
- fn mflags_env(&self) -> String {
- let arg = self.inner.string_arg();
- // Older implementations of make use `--jobserver-fds` and newer
- // implementations use `--jobserver-auth`, pass both to try to catch
- // both implementations.
- format!("-j --jobserver-fds={0} --jobserver-auth={0}", arg)
- }
-
- /// Converts this `Client` into a helper thread to deal with a blocking
- /// `acquire` function a little more easily.
- ///
- /// The fact that the `acquire` function on `Client` blocks isn't always
- /// the easiest to work with. Typically you're using a jobserver to
- /// manage running other events in parallel! This means that you need to
- /// either (a) wait for an existing job to finish or (b) wait for a
- /// new token to become available.
- ///
- /// Unfortunately the blocking in `acquire` happens at the implementation
- /// layer of jobservers. On Unix this requires a blocking call to `read`
- /// and on Windows this requires one of the `WaitFor*` functions. Both
- /// of these situations aren't the easiest to deal with:
- ///
- /// * On Unix there's basically only one way to wake up a `read` early, and
- /// that's through a signal. This is what the `make` implementation
- /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
- /// of a new job token. Unfortunately nonblocking I/O is not an option
- /// here, so it means that "waiting for one of two events" means that
- /// the latter event must generate a signal! This is not always the case
- /// on unix for all jobservers.
- ///
- /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
- /// which means that you've got to canonicalize all your event sources
- /// into a `HANDLE` which also isn't the easiest thing to do
- /// unfortunately.
- ///
- /// This function essentially attempts to ease these limitations by
- /// converting this `Client` into a helper thread spawned into this
- /// process. The application can then request that the helper thread
- /// acquires tokens and the provided closure will be invoked for each token
- /// acquired.
- ///
- /// The intention is that this function can be used to translate the event
- /// of a token acquisition into an arbitrary user-defined event.
- ///
- /// # Arguments
- ///
- /// This function will consume the `Client` provided to be transferred to
- /// the helper thread that is spawned. Additionally a closure `f` is
- /// provided to be invoked whenever a token is acquired.
- ///
- /// This closure is only invoked after calls to
- /// `HelperThread::request_token` have been made and a token itself has
- /// been acquired. If an error happens while acquiring the token then
- /// an error will be yielded to the closure as well.
- ///
- /// # Return Value
- ///
- /// This function will return an instance of the `HelperThread` structure
- /// which is used to manage the helper thread associated with this client.
- /// Through the `HelperThread` you'll request that tokens are acquired.
- /// When acquired, the closure provided here is invoked.
- ///
- /// When the `HelperThread` structure is returned it will be gracefully
- /// torn down, and the calling thread will be blocked until the thread is
- /// torn down (which should be prompt).
- ///
- /// # Errors
- ///
- /// This function may fail due to creation of the helper thread or
- /// auxiliary I/O objects to manage the helper thread. In any of these
- /// situations the error is propagated upwards.
- ///
- /// # Platform-specific behavior
- ///
- /// On Windows this function behaves pretty normally as expected, but on
- /// Unix the implementation is... a little heinous. As mentioned above
- /// we're forced into blocking I/O for token acquisition, namely a blocking
- /// call to `read`. We must be able to unblock this, however, to tear down
- /// the helper thread gracefully!
- ///
- /// Essentially what happens is that we'll send a signal to the helper
- /// thread spawned and rely on `EINTR` being returned to wake up the helper
- /// thread. This involves installing a global `SIGUSR1` handler that does
- /// nothing along with sending signals to that thread. This may cause
- /// odd behavior in some applications, so it's recommended to review and
- /// test thoroughly before using this.
- pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
- where
- F: FnMut(io::Result<Acquired>) + Send + 'static,
- {
- let state = Arc::new(HelperState::default());
- Ok(HelperThread {
- inner: Some(imp::spawn_helper(self, state.clone(), Box::new(f))?),
- state,
- })
- }
-
- /// Blocks the current thread until a token is acquired.
- ///
- /// This is the same as `acquire`, except that it doesn't return an RAII
- /// helper. If successful the process will need to guarantee that
- /// `release_raw` is called in the future.
- pub fn acquire_raw(&self) -> io::Result<()> {
- self.inner.acquire()?;
- Ok(())
- }
-
- /// Releases a jobserver token back to the original jobserver.
- ///
- /// This is intended to be paired with `acquire_raw` if it was called, but
- /// in some situations it could also be called to relinquish a process's
- /// implicit token temporarily which is then re-acquired later.
- pub fn release_raw(&self) -> io::Result<()> {
- self.inner.release(None)?;
- Ok(())
- }
-}
-
-impl Drop for Acquired {
- fn drop(&mut self) {
- if !self.disabled {
- drop(self.client.release(Some(&self.data)));
- }
- }
-}
-
-/// Structure returned from `Client::into_helper_thread` to manage the lifetime
-/// of the helper thread returned, see those associated docs for more info.
-#[derive(Debug)]
-pub struct HelperThread {
- inner: Option<imp::Helper>,
- state: Arc<HelperState>,
-}
-
-impl HelperThread {
- /// Request that the helper thread acquires a token, eventually calling the
- /// original closure with a token when it's available.
- ///
- /// For more information, see the docs on that function.
- pub fn request_token(&self) {
- // Indicate that there's one more request for a token and then wake up
- // the helper thread if it's sleeping.
- self.state.lock().requests += 1;
- self.state.cvar.notify_one();
- }
-}
-
-impl Drop for HelperThread {
- fn drop(&mut self) {
- // Flag that the producer half is done so the helper thread should exit
- // quickly if it's waiting. Wake it up if it's actually waiting
- self.state.lock().producer_done = true;
- self.state.cvar.notify_one();
-
- // ... and afterwards perform any thread cleanup logic
- self.inner.take().unwrap().join();
- }
-}
-
-impl HelperState {
- fn lock(&self) -> MutexGuard<'_, HelperInner> {
- self.lock.lock().unwrap_or_else(|e| e.into_inner())
- }
-
- /// Executes `f` for each request for a token, where `f` is expected to
- /// block and then provide the original closure with a token once it's
- /// acquired.
- ///
- /// This is an infinite loop until the helper thread is dropped, at which
- /// point everything should get interrupted.
- fn for_each_request(&self, mut f: impl FnMut(&HelperState)) {
- let mut lock = self.lock();
-
- // We only execute while we could receive requests, but as soon as
- // that's `false` we're out of here.
- while !lock.producer_done {
- // If no one's requested a token then we wait for someone to
- // request a token.
- if lock.requests == 0 {
- lock = self.cvar.wait(lock).unwrap_or_else(|e| e.into_inner());
- continue;
- }
-
- // Consume the request for a token, and then actually acquire a
- // token after unlocking our lock (not that acquisition happens in
- // `f`). This ensures that we don't actually hold the lock if we
- // wait for a long time for a token.
- lock.requests -= 1;
- drop(lock);
- f(self);
- lock = self.lock();
- }
- lock.consumer_done = true;
- self.cvar.notify_one();
- }
-
- fn producer_done(&self) -> bool {
- self.lock().producer_done
- }
-}
-
-#[test]
-fn no_helper_deadlock() {
- let x = crate::Client::new(32).unwrap();
- let _y = x.clone();
- std::mem::drop(x.into_helper_thread(|_| {}).unwrap());
-}
+//! An implementation of the GNU make jobserver.
+//!
+//! This crate is an implementation, in Rust, of the GNU `make` jobserver for
+//! CLI tools that are interoperating with make or otherwise require some form
+//! of parallelism limiting across process boundaries. This was originally
+//! written for usage in Cargo to both (a) work when `cargo` is invoked from
+//! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
+//! scripts, exporting a jobserver implementation for `make` processes to
+//! transitively use.
+//!
+//! The jobserver implementation can be found in [detail online][docs] but
+//! basically boils down to a cross-process semaphore. On Unix this is
+//! implemented with the `pipe` syscall and read/write ends of a pipe and on
+//! Windows this is implemented literally with IPC semaphores. Starting from
+//! GNU `make` version 4.4, named pipe becomes the default way in communication
+//! on Unix. This crate also supports that feature in the sense of inheriting
+//! and forwarding the correct environment.
+//!
+//! The jobserver protocol in `make` also dictates when tokens are acquired to
+//! run child work, and clients using this crate should take care to implement
+//! such details to ensure correct interoperation with `make` itself.
+//!
+//! ## Examples
+//!
+//! Connect to a jobserver that was set up by `make` or a different process:
+//!
+//! ```no_run
+//! use jobserver::Client;
+//!
+//! // See API documentation for why this is `unsafe`
+//! let client = match unsafe { Client::from_env() } {
+//! Some(client) => client,
+//! None => panic!("client not configured"),
+//! };
+//! ```
+//!
+//! Acquire and release token from a jobserver:
+//!
+//! ```no_run
+//! use jobserver::Client;
+//!
+//! let client = unsafe { Client::from_env().unwrap() };
+//! let token = client.acquire().unwrap(); // blocks until it is available
+//! drop(token); // releases the token when the work is done
+//! ```
+//!
+//! Create a new jobserver and configure a child process to have access:
+//!
+//! ```
+//! use std::process::Command;
+//! use jobserver::Client;
+//!
+//! let client = Client::new(4).expect("failed to create jobserver");
+//! let mut cmd = Command::new("make");
+//! client.configure(&mut cmd);
+//! ```
+//!
+//! ## Caveats
+//!
+//! This crate makes no attempt to release tokens back to a jobserver on
+//! abnormal exit of a process. If a process which acquires a token is killed
+//! with ctrl-c or some similar signal then tokens will not be released and the
+//! jobserver may be in a corrupt state.
+//!
+//! Note that this is typically ok as ctrl-c means that an entire build process
+//! is being torn down, but it's worth being aware of at least!
+//!
+//! ## Windows caveats
+//!
+//! There appear to be two implementations of `make` on Windows. On MSYS2 one
+//! typically comes as `mingw32-make` and the other as `make` itself. I'm not
+//! personally too familiar with what's going on here, but for jobserver-related
+//! information the `mingw32-make` implementation uses Windows semaphores
+//! whereas the `make` program does not. The `make` program appears to use file
+//! descriptors and I'm not really sure how it works, so this crate is not
+//! compatible with `make` on Windows. It is, however, compatible with
+//! `mingw32-make`.
+//!
+//! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
+
+#![deny(missing_docs, missing_debug_implementations)]
+#![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
+
+use std::env;
+use std::ffi::OsString;
+use std::io;
+use std::process::Command;
+use std::sync::{Arc, Condvar, Mutex, MutexGuard};
+
+mod error;
+#[cfg(unix)]
+#[path = "unix.rs"]
+mod imp;
+#[cfg(windows)]
+#[path = "windows.rs"]
+mod imp;
+#[cfg(not(any(unix, windows)))]
+#[path = "wasm.rs"]
+mod imp;
+
+/// A client of a jobserver
+///
+/// This structure is the main type exposed by this library, and is where
+/// interaction to a jobserver is configured through. Clients are either created
+/// from scratch in which case the internal semphore is initialied on the spot,
+/// or a client is created from the environment to connect to a jobserver
+/// already created.
+///
+/// Some usage examples can be found in the crate documentation for using a
+/// client.
+///
+/// Note that a `Client` implements the `Clone` trait, and all instances of a
+/// `Client` refer to the same jobserver instance.
+#[derive(Clone, Debug)]
+pub struct Client {
+ inner: Arc<imp::Client>,
+}
+
+/// An acquired token from a jobserver.
+///
+/// This token will be released back to the jobserver when it is dropped and
+/// otherwise represents the ability to spawn off another thread of work.
+#[derive(Debug)]
+pub struct Acquired {
+ client: Arc<imp::Client>,
+ data: imp::Acquired,
+ disabled: bool,
+}
+
+impl Acquired {
+ /// This drops the `Acquired` token without releasing the associated token.
+ ///
+ /// This is not generally useful, but can be helpful if you do not have the
+ /// ability to store an Acquired token but need to not yet release it.
+ ///
+ /// You'll typically want to follow this up with a call to `release_raw` or
+ /// similar to actually release the token later on.
+ pub fn drop_without_releasing(mut self) {
+ self.disabled = true;
+ }
+}
+
+#[derive(Default, Debug)]
+struct HelperState {
+ lock: Mutex<HelperInner>,
+ cvar: Condvar,
+}
+
+#[derive(Default, Debug)]
+struct HelperInner {
+ requests: usize,
+ producer_done: bool,
+ consumer_done: bool,
+}
+
+use error::FromEnvErrorInner;
+pub use error::{FromEnvError, FromEnvErrorKind};
+
+/// Return type for `from_env_ext` function.
+#[derive(Debug)]
+pub struct FromEnv {
+ /// Result of trying to get jobserver client from env.
+ pub client: Result<Client, FromEnvError>,
+ /// Name and value of the environment variable.
+ /// `None` if no relevant environment variable is found.
+ pub var: Option<(&'static str, OsString)>,
+}
+
+impl FromEnv {
+ fn new_ok(client: Client, var_name: &'static str, var_value: OsString) -> FromEnv {
+ FromEnv {
+ client: Ok(client),
+ var: Some((var_name, var_value)),
+ }
+ }
+ fn new_err(kind: FromEnvErrorInner, var_name: &'static str, var_value: OsString) -> FromEnv {
+ FromEnv {
+ client: Err(FromEnvError { inner: kind }),
+ var: Some((var_name, var_value)),
+ }
+ }
+}
+
+impl Client {
+ /// Creates a new jobserver initialized with the given parallelism limit.
+ ///
+ /// A client to the jobserver created will be returned. This client will
+ /// allow at most `limit` tokens to be acquired from it in parallel. More
+ /// calls to `acquire` will cause the calling thread to block.
+ ///
+ /// Note that the created `Client` is not automatically inherited into
+ /// spawned child processes from this program. Manual usage of the
+ /// `configure` function is required for a child process to have access to a
+ /// job server.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use jobserver::Client;
+ ///
+ /// let client = Client::new(4).expect("failed to create jobserver");
+ /// ```
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if any I/O error happens when attempting to create the
+ /// jobserver client.
+ pub fn new(limit: usize) -> io::Result<Client> {
+ Ok(Client {
+ inner: Arc::new(imp::Client::new(limit)?),
+ })
+ }
+
+ /// Attempts to connect to the jobserver specified in this process's
+ /// environment.
+ ///
+ /// When the a `make` executable calls a child process it will configure the
+ /// environment of the child to ensure that it has handles to the jobserver
+ /// it's passing down. This function will attempt to look for these details
+ /// and connect to the jobserver.
+ ///
+ /// Note that the created `Client` is not automatically inherited into
+ /// spawned child processes from this program. Manual usage of the
+ /// `configure` function is required for a child process to have access to a
+ /// job server.
+ ///
+ /// # Return value
+ ///
+ /// `FromEnv` contains result and relevant environment variable.
+ /// If a jobserver was found in the environment and it looks correct then
+ /// result with the connected client will be returned. In other cases
+ /// result will contain `Err(FromEnvErr)`.
+ ///
+ /// Note that on Unix the `Client` returned **takes ownership of the file
+ /// descriptors specified in the environment**. Jobservers on Unix are
+ /// implemented with `pipe` file descriptors, and they're inherited from
+ /// parent processes. This `Client` returned takes ownership of the file
+ /// descriptors for this process and will close the file descriptors after
+ /// this value is dropped.
+ ///
+ /// Additionally on Unix this function will configure the file descriptors
+ /// with `CLOEXEC` so they're not automatically inherited by spawned
+ /// children.
+ ///
+ /// On unix if `check_pipe` enabled this function will check if provided
+ /// files are actually pipes.
+ ///
+ /// # Safety
+ ///
+ /// This function is `unsafe` to call on Unix specifically as it
+ /// transitively requires usage of the `from_raw_fd` function, which is
+ /// itself unsafe in some circumstances.
+ ///
+ /// It's recommended to call this function very early in the lifetime of a
+ /// program before any other file descriptors are opened. That way you can
+ /// make sure to take ownership properly of the file descriptors passed
+ /// down, if any.
+ ///
+ /// It's generally unsafe to call this function twice in a program if the
+ /// previous invocation returned `Some`.
+ ///
+ /// Note, though, that on Windows it should be safe to call this function
+ /// any number of times.
+ pub unsafe fn from_env_ext(check_pipe: bool) -> FromEnv {
+ let (env, var_os) = match ["CARGO_MAKEFLAGS", "MAKEFLAGS", "MFLAGS"]
+ .iter()
+ .map(|&env| env::var_os(env).map(|var| (env, var)))
+ .find_map(|p| p)
+ {
+ Some((env, var_os)) => (env, var_os),
+ None => return FromEnv::new_err(FromEnvErrorInner::NoEnvVar, "", Default::default()),
+ };
+
+ let var = match var_os.to_str() {
+ Some(var) => var,
+ None => {
+ let err = FromEnvErrorInner::CannotParse("not valid UTF-8".to_string());
+ return FromEnv::new_err(err, env, var_os);
+ }
+ };
+
+ let (arg, pos) = match ["--jobserver-fds=", "--jobserver-auth="]
+ .iter()
+ .map(|&arg| var.find(arg).map(|pos| (arg, pos)))
+ .find_map(|pos| pos)
+ {
+ Some((arg, pos)) => (arg, pos),
+ None => return FromEnv::new_err(FromEnvErrorInner::NoJobserver, env, var_os),
+ };
+
+ let s = var[pos + arg.len()..].split(' ').next().unwrap();
+ match imp::Client::open(s, check_pipe) {
+ Ok(c) => FromEnv::new_ok(Client { inner: Arc::new(c) }, env, var_os),
+ Err(err) => FromEnv::new_err(err, env, var_os),
+ }
+ }
+
+ /// Attempts to connect to the jobserver specified in this process's
+ /// environment.
+ ///
+ /// Wraps `from_env_ext` and discards error details.
+ pub unsafe fn from_env() -> Option<Client> {
+ Self::from_env_ext(false).client.ok()
+ }
+
+ /// Acquires a token from this jobserver client.
+ ///
+ /// This function will block the calling thread until a new token can be
+ /// acquired from the jobserver.
+ ///
+ /// # Return value
+ ///
+ /// On successful acquisition of a token an instance of `Acquired` is
+ /// returned. This structure, when dropped, will release the token back to
+ /// the jobserver. It's recommended to avoid leaking this value.
+ ///
+ /// # Errors
+ ///
+ /// If an I/O error happens while acquiring a token then this function will
+ /// return immediately with the error. If an error is returned then a token
+ /// was not acquired.
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ let data = self.inner.acquire()?;
+ Ok(Acquired {
+ client: self.inner.clone(),
+ data,
+ disabled: false,
+ })
+ }
+
+ /// Returns amount of tokens in the read-side pipe.
+ ///
+ /// # Return value
+ ///
+ /// Number of bytes available to be read from the jobserver pipe
+ ///
+ /// # Errors
+ ///
+ /// Underlying errors from the ioctl will be passed up.
+ pub fn available(&self) -> io::Result<usize> {
+ self.inner.available()
+ }
+
+ /// Configures a child process to have access to this client's jobserver as
+ /// well.
+ ///
+ /// This function is required to be called to ensure that a jobserver is
+ /// properly inherited to a child process. If this function is *not* called
+ /// then this `Client` will not be accessible in the child process. In other
+ /// words, if not called, then `Client::from_env` will return `None` in the
+ /// child process (or the equivalent of `Child::from_env` that `make` uses).
+ ///
+ /// ## Platform-specific behavior
+ ///
+ /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
+ /// variables for the child process, and on Unix this will also allow the
+ /// two file descriptors for this client to be inherited to the child.
+ ///
+ /// On platforms other than Unix and Windows this panics.
+ pub fn configure(&self, cmd: &mut Command) {
+ cmd.env("CARGO_MAKEFLAGS", &self.mflags_env());
+ self.inner.configure(cmd);
+ }
+
+ /// Configures a child process to have access to this client's jobserver as
+ /// well.
+ ///
+ /// This function is required to be called to ensure that a jobserver is
+ /// properly inherited to a child process. If this function is *not* called
+ /// then this `Client` will not be accessible in the child process. In other
+ /// words, if not called, then `Client::from_env` will return `None` in the
+ /// child process (or the equivalent of `Child::from_env` that `make` uses).
+ ///
+ /// ## Platform-specific behavior
+ ///
+ /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS`,
+ /// `MAKEFLAGS` and `MFLAGS` environment variables for the child process,
+ /// and on Unix this will also allow the two file descriptors for
+ /// this client to be inherited to the child.
+ ///
+ /// On platforms other than Unix and Windows this panics.
+ pub fn configure_make(&self, cmd: &mut Command) {
+ let value = self.mflags_env();
+ cmd.env("CARGO_MAKEFLAGS", &value);
+ cmd.env("MAKEFLAGS", &value);
+ cmd.env("MFLAGS", &value);
+ self.inner.configure(cmd);
+ }
+
+ fn mflags_env(&self) -> String {
+ let arg = self.inner.string_arg();
+ // Older implementations of make use `--jobserver-fds` and newer
+ // implementations use `--jobserver-auth`, pass both to try to catch
+ // both implementations.
+ format!("-j --jobserver-fds={0} --jobserver-auth={0}", arg)
+ }
+
+ /// Converts this `Client` into a helper thread to deal with a blocking
+ /// `acquire` function a little more easily.
+ ///
+ /// The fact that the `acquire` function on `Client` blocks isn't always
+ /// the easiest to work with. Typically you're using a jobserver to
+ /// manage running other events in parallel! This means that you need to
+ /// either (a) wait for an existing job to finish or (b) wait for a
+ /// new token to become available.
+ ///
+ /// Unfortunately the blocking in `acquire` happens at the implementation
+ /// layer of jobservers. On Unix this requires a blocking call to `read`
+ /// and on Windows this requires one of the `WaitFor*` functions. Both
+ /// of these situations aren't the easiest to deal with:
+ ///
+ /// * On Unix there's basically only one way to wake up a `read` early, and
+ /// that's through a signal. This is what the `make` implementation
+ /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
+ /// of a new job token. Unfortunately nonblocking I/O is not an option
+ /// here, so it means that "waiting for one of two events" means that
+ /// the latter event must generate a signal! This is not always the case
+ /// on unix for all jobservers.
+ ///
+ /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
+ /// which means that you've got to canonicalize all your event sources
+ /// into a `HANDLE` which also isn't the easiest thing to do
+ /// unfortunately.
+ ///
+ /// This function essentially attempts to ease these limitations by
+ /// converting this `Client` into a helper thread spawned into this
+ /// process. The application can then request that the helper thread
+ /// acquires tokens and the provided closure will be invoked for each token
+ /// acquired.
+ ///
+ /// The intention is that this function can be used to translate the event
+ /// of a token acquisition into an arbitrary user-defined event.
+ ///
+ /// # Arguments
+ ///
+ /// This function will consume the `Client` provided to be transferred to
+ /// the helper thread that is spawned. Additionally a closure `f` is
+ /// provided to be invoked whenever a token is acquired.
+ ///
+ /// This closure is only invoked after calls to
+ /// `HelperThread::request_token` have been made and a token itself has
+ /// been acquired. If an error happens while acquiring the token then
+ /// an error will be yielded to the closure as well.
+ ///
+ /// # Return Value
+ ///
+ /// This function will return an instance of the `HelperThread` structure
+ /// which is used to manage the helper thread associated with this client.
+ /// Through the `HelperThread` you'll request that tokens are acquired.
+ /// When acquired, the closure provided here is invoked.
+ ///
+ /// When the `HelperThread` structure is returned it will be gracefully
+ /// torn down, and the calling thread will be blocked until the thread is
+ /// torn down (which should be prompt).
+ ///
+ /// # Errors
+ ///
+ /// This function may fail due to creation of the helper thread or
+ /// auxiliary I/O objects to manage the helper thread. In any of these
+ /// situations the error is propagated upwards.
+ ///
+ /// # Platform-specific behavior
+ ///
+ /// On Windows this function behaves pretty normally as expected, but on
+ /// Unix the implementation is... a little heinous. As mentioned above
+ /// we're forced into blocking I/O for token acquisition, namely a blocking
+ /// call to `read`. We must be able to unblock this, however, to tear down
+ /// the helper thread gracefully!
+ ///
+ /// Essentially what happens is that we'll send a signal to the helper
+ /// thread spawned and rely on `EINTR` being returned to wake up the helper
+ /// thread. This involves installing a global `SIGUSR1` handler that does
+ /// nothing along with sending signals to that thread. This may cause
+ /// odd behavior in some applications, so it's recommended to review and
+ /// test thoroughly before using this.
+ pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
+ where
+ F: FnMut(io::Result<Acquired>) + Send + 'static,
+ {
+ let state = Arc::new(HelperState::default());
+ Ok(HelperThread {
+ inner: Some(imp::spawn_helper(self, state.clone(), Box::new(f))?),
+ state,
+ })
+ }
+
+ /// Blocks the current thread until a token is acquired.
+ ///
+ /// This is the same as `acquire`, except that it doesn't return an RAII
+ /// helper. If successful the process will need to guarantee that
+ /// `release_raw` is called in the future.
+ pub fn acquire_raw(&self) -> io::Result<()> {
+ self.inner.acquire()?;
+ Ok(())
+ }
+
+ /// Releases a jobserver token back to the original jobserver.
+ ///
+ /// This is intended to be paired with `acquire_raw` if it was called, but
+ /// in some situations it could also be called to relinquish a process's
+ /// implicit token temporarily which is then re-acquired later.
+ pub fn release_raw(&self) -> io::Result<()> {
+ self.inner.release(None)?;
+ Ok(())
+ }
+}
+
+impl Drop for Acquired {
+ fn drop(&mut self) {
+ if !self.disabled {
+ drop(self.client.release(Some(&self.data)));
+ }
+ }
+}
+
+/// Structure returned from `Client::into_helper_thread` to manage the lifetime
+/// of the helper thread returned, see those associated docs for more info.
+#[derive(Debug)]
+pub struct HelperThread {
+ inner: Option<imp::Helper>,
+ state: Arc<HelperState>,
+}
+
+impl HelperThread {
+ /// Request that the helper thread acquires a token, eventually calling the
+ /// original closure with a token when it's available.
+ ///
+ /// For more information, see the docs on that function.
+ pub fn request_token(&self) {
+ // Indicate that there's one more request for a token and then wake up
+ // the helper thread if it's sleeping.
+ self.state.lock().requests += 1;
+ self.state.cvar.notify_one();
+ }
+}
+
+impl Drop for HelperThread {
+ fn drop(&mut self) {
+ // Flag that the producer half is done so the helper thread should exit
+ // quickly if it's waiting. Wake it up if it's actually waiting
+ self.state.lock().producer_done = true;
+ self.state.cvar.notify_one();
+
+ // ... and afterwards perform any thread cleanup logic
+ self.inner.take().unwrap().join();
+ }
+}
+
+impl HelperState {
+ fn lock(&self) -> MutexGuard<'_, HelperInner> {
+ self.lock.lock().unwrap_or_else(|e| e.into_inner())
+ }
+
+ /// Executes `f` for each request for a token, where `f` is expected to
+ /// block and then provide the original closure with a token once it's
+ /// acquired.
+ ///
+ /// This is an infinite loop until the helper thread is dropped, at which
+ /// point everything should get interrupted.
+ fn for_each_request(&self, mut f: impl FnMut(&HelperState)) {
+ let mut lock = self.lock();
+
+ // We only execute while we could receive requests, but as soon as
+ // that's `false` we're out of here.
+ while !lock.producer_done {
+ // If no one's requested a token then we wait for someone to
+ // request a token.
+ if lock.requests == 0 {
+ lock = self.cvar.wait(lock).unwrap_or_else(|e| e.into_inner());
+ continue;
+ }
+
+ // Consume the request for a token, and then actually acquire a
+ // token after unlocking our lock (not that acquisition happens in
+ // `f`). This ensures that we don't actually hold the lock if we
+ // wait for a long time for a token.
+ lock.requests -= 1;
+ drop(lock);
+ f(self);
+ lock = self.lock();
+ }
+ lock.consumer_done = true;
+ self.cvar.notify_one();
+ }
+
+ fn producer_done(&self) -> bool {
+ self.lock().producer_done
+ }
+}
+
+#[test]
+fn no_helper_deadlock() {
+ let x = crate::Client::new(32).unwrap();
+ let _y = x.clone();
+ std::mem::drop(x.into_helper_thread(|_| {}).unwrap());
+}
diff --git a/vendor/jobserver/src/unix.rs b/vendor/jobserver/src/unix.rs
index e4b143505..b2312b08f 100644
--- a/vendor/jobserver/src/unix.rs
+++ b/vendor/jobserver/src/unix.rs
@@ -1,431 +1,477 @@
-use libc::c_int;
-
-use std::fs::{File, OpenOptions};
-use std::io::{self, Read, Write};
-use std::mem;
-use std::mem::MaybeUninit;
-use std::os::unix::prelude::*;
-use std::path::{Path, PathBuf};
-use std::process::Command;
-use std::ptr;
-use std::sync::{Arc, Once};
-use std::thread::{self, Builder, JoinHandle};
-use std::time::Duration;
-
-#[derive(Debug)]
-pub enum Client {
- /// `--jobserver-auth=R,W`
- Pipe { read: File, write: File },
- /// `--jobserver-auth=fifo:PATH`
- Fifo { file: File, path: PathBuf },
-}
-
-#[derive(Debug)]
-pub struct Acquired {
- byte: u8,
-}
-
-impl Client {
- pub fn new(mut limit: usize) -> io::Result<Client> {
- let client = unsafe { Client::mk()? };
-
- // I don't think the character written here matters, but I could be
- // wrong!
- const BUFFER: [u8; 128] = [b'|'; 128];
-
- let mut write = client.write();
-
- set_nonblocking(write.as_raw_fd(), true)?;
-
- while limit > 0 {
- let n = limit.min(BUFFER.len());
-
- write.write_all(&BUFFER[..n])?;
- limit -= n;
- }
-
- set_nonblocking(write.as_raw_fd(), false)?;
-
- Ok(client)
- }
-
- unsafe fn mk() -> io::Result<Client> {
- let mut pipes = [0; 2];
-
- // Attempt atomically-create-with-cloexec if we can on Linux,
- // detected by using the `syscall` function in `libc` to try to work
- // with as many kernels/glibc implementations as possible.
- #[cfg(target_os = "linux")]
- {
- use std::sync::atomic::{AtomicBool, Ordering};
-
- static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
- if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
- match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
- -1 => {
- let err = io::Error::last_os_error();
- if err.raw_os_error() == Some(libc::ENOSYS) {
- PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
- } else {
- return Err(err);
- }
- }
- _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
- }
- }
- }
-
- cvt(libc::pipe(pipes.as_mut_ptr()))?;
- drop(set_cloexec(pipes[0], true));
- drop(set_cloexec(pipes[1], true));
- Ok(Client::from_fds(pipes[0], pipes[1]))
- }
-
- pub unsafe fn open(s: &str) -> Option<Client> {
- Client::from_fifo(s).or_else(|| Client::from_pipe(s))
- }
-
- /// `--jobserver-auth=fifo:PATH`
- fn from_fifo(s: &str) -> Option<Client> {
- let mut parts = s.splitn(2, ':');
- if parts.next().unwrap() != "fifo" {
- return None;
- }
- let path = match parts.next() {
- Some(p) => Path::new(p),
- None => return None,
- };
- let file = match OpenOptions::new().read(true).write(true).open(path) {
- Ok(f) => f,
- Err(_) => return None,
- };
- Some(Client::Fifo {
- file,
- path: path.into(),
- })
- }
-
- /// `--jobserver-auth=R,W`
- unsafe fn from_pipe(s: &str) -> Option<Client> {
- let mut parts = s.splitn(2, ',');
- let read = parts.next().unwrap();
- let write = match parts.next() {
- Some(s) => s,
- None => return None,
- };
-
- let read = match read.parse() {
- Ok(n) => n,
- Err(_) => return None,
- };
- let write = match write.parse() {
- Ok(n) => n,
- Err(_) => return None,
- };
-
- // Ok so we've got two integers that look like file descriptors, but
- // for extra sanity checking let's see if they actually look like
- // instances of a pipe before we return the client.
- //
- // If we're called from `make` *without* the leading + on our rule
- // then we'll have `MAKEFLAGS` env vars but won't actually have
- // access to the file descriptors.
- if is_valid_fd(read) && is_valid_fd(write) {
- drop(set_cloexec(read, true));
- drop(set_cloexec(write, true));
- Some(Client::from_fds(read, write))
- } else {
- None
- }
- }
-
- unsafe fn from_fds(read: c_int, write: c_int) -> Client {
- Client::Pipe {
- read: File::from_raw_fd(read),
- write: File::from_raw_fd(write),
- }
- }
-
- /// Gets the read end of our jobserver client.
- fn read(&self) -> &File {
- match self {
- Client::Pipe { read, .. } => read,
- Client::Fifo { file, .. } => file,
- }
- }
-
- /// Gets the write end of our jobserver client.
- fn write(&self) -> &File {
- match self {
- Client::Pipe { write, .. } => write,
- Client::Fifo { file, .. } => file,
- }
- }
-
- pub fn acquire(&self) -> io::Result<Acquired> {
- // Ignore interrupts and keep trying if that happens
- loop {
- if let Some(token) = self.acquire_allow_interrupts()? {
- return Ok(token);
- }
- }
- }
-
- /// Block waiting for a token, returning `None` if we're interrupted with
- /// EINTR.
- fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
- // We don't actually know if the file descriptor here is set in
- // blocking or nonblocking mode. AFAIK all released versions of
- // `make` use blocking fds for the jobserver, but the unreleased
- // version of `make` doesn't. In the unreleased version jobserver
- // fds are set to nonblocking and combined with `pselect`
- // internally.
- //
- // Here we try to be compatible with both strategies. We optimistically
- // try to read from the file descriptor which then may block, return
- // a token or indicate that polling is needed.
- // Blocking reads (if possible) allows the kernel to be more selective
- // about which readers to wake up when a token is written to the pipe.
- //
- // We use `poll` here to block this thread waiting for read
- // readiness, and then afterwards we perform the `read` itself. If
- // the `read` returns that it would block then we start over and try
- // again.
- //
- // Also note that we explicitly don't handle EINTR here. That's used
- // to shut us down, so we otherwise punt all errors upwards.
- unsafe {
- let mut fd: libc::pollfd = mem::zeroed();
- let mut read = self.read();
- fd.fd = read.as_raw_fd();
- fd.events = libc::POLLIN;
- loop {
- let mut buf = [0];
- match read.read(&mut buf) {
- Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
- Ok(_) => {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "early EOF on jobserver pipe",
- ))
- }
- Err(e) => match e.kind() {
- io::ErrorKind::WouldBlock => { /* fall through to polling */ }
- io::ErrorKind::Interrupted => return Ok(None),
- _ => return Err(e),
- },
- }
-
- loop {
- fd.revents = 0;
- if libc::poll(&mut fd, 1, -1) == -1 {
- let e = io::Error::last_os_error();
- return match e.kind() {
- io::ErrorKind::Interrupted => Ok(None),
- _ => Err(e),
- };
- }
- if fd.revents != 0 {
- break;
- }
- }
- }
- }
- }
-
- pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
- // Note that the fd may be nonblocking but we're going to go ahead
- // and assume that the writes here are always nonblocking (we can
- // always quickly release a token). If that turns out to not be the
- // case we'll get an error anyway!
- let byte = data.map(|d| d.byte).unwrap_or(b'+');
- match self.write().write(&[byte])? {
- 1 => Ok(()),
- _ => Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to write token back to jobserver",
- )),
- }
- }
-
- pub fn string_arg(&self) -> String {
- match self {
- Client::Pipe { read, write } => format!("{},{}", read.as_raw_fd(), write.as_raw_fd()),
- Client::Fifo { path, .. } => format!("fifo:{}", path.to_str().unwrap()),
- }
- }
-
- pub fn available(&self) -> io::Result<usize> {
- let mut len = MaybeUninit::<c_int>::uninit();
- cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
- Ok(unsafe { len.assume_init() } as usize)
- }
-
- pub fn configure(&self, cmd: &mut Command) {
- match self {
- // We `File::open`ed it when inheriting from environment,
- // so no need to set cloexec for fifo.
- Client::Fifo { .. } => return,
- Client::Pipe { .. } => {}
- };
- // Here we basically just want to say that in the child process
- // we'll configure the read/write file descriptors to *not* be
- // cloexec, so they're inherited across the exec and specified as
- // integers through `string_arg` above.
- let read = self.read().as_raw_fd();
- let write = self.write().as_raw_fd();
- unsafe {
- cmd.pre_exec(move || {
- set_cloexec(read, false)?;
- set_cloexec(write, false)?;
- Ok(())
- });
- }
- }
-}
-
-#[derive(Debug)]
-pub struct Helper {
- thread: JoinHandle<()>,
- state: Arc<super::HelperState>,
-}
-
-pub(crate) fn spawn_helper(
- client: crate::Client,
- state: Arc<super::HelperState>,
- mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
-) -> io::Result<Helper> {
- static USR1_INIT: Once = Once::new();
- let mut err = None;
- USR1_INIT.call_once(|| unsafe {
- let mut new: libc::sigaction = mem::zeroed();
- #[cfg(target_os = "aix")]
- {
- new.sa_union.__su_sigaction = sigusr1_handler;
- }
- #[cfg(not(target_os = "aix"))]
- {
- new.sa_sigaction = sigusr1_handler as usize;
- }
- new.sa_flags = libc::SA_SIGINFO as _;
- if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
- err = Some(io::Error::last_os_error());
- }
- });
-
- if let Some(e) = err.take() {
- return Err(e);
- }
-
- let state2 = state.clone();
- let thread = Builder::new().spawn(move || {
- state2.for_each_request(|helper| loop {
- match client.inner.acquire_allow_interrupts() {
- Ok(Some(data)) => {
- break f(Ok(crate::Acquired {
- client: client.inner.clone(),
- data,
- disabled: false,
- }))
- }
- Err(e) => break f(Err(e)),
- Ok(None) if helper.producer_done() => break,
- Ok(None) => {}
- }
- });
- })?;
-
- Ok(Helper { thread, state })
-}
-
-impl Helper {
- pub fn join(self) {
- let dur = Duration::from_millis(10);
- let mut state = self.state.lock();
- debug_assert!(state.producer_done);
-
- // We need to join our helper thread, and it could be blocked in one
- // of two locations. First is the wait for a request, but the
- // initial drop of `HelperState` will take care of that. Otherwise
- // it may be blocked in `client.acquire()`. We actually have no way
- // of interrupting that, so resort to `pthread_kill` as a fallback.
- // This signal should interrupt any blocking `read` call with
- // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
- //
- // Note that we don't do this forever though since there's a chance
- // of bugs, so only do this opportunistically to make a best effort
- // at clearing ourselves up.
- for _ in 0..100 {
- if state.consumer_done {
- break;
- }
- unsafe {
- // Ignore the return value here of `pthread_kill`,
- // apparently on OSX if you kill a dead thread it will
- // return an error, but on other platforms it may not. In
- // that sense we don't actually know if this will succeed or
- // not!
- libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
- }
- state = self
- .state
- .cvar
- .wait_timeout(state, dur)
- .unwrap_or_else(|e| e.into_inner())
- .0;
- thread::yield_now(); // we really want the other thread to run
- }
-
- // If we managed to actually see the consumer get done, then we can
- // definitely wait for the thread. Otherwise it's... off in the ether
- // I guess?
- if state.consumer_done {
- drop(self.thread.join());
- }
- }
-}
-
-fn is_valid_fd(fd: c_int) -> bool {
- unsafe { libc::fcntl(fd, libc::F_GETFD) != -1 }
-}
-
-fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
- unsafe {
- let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
- let new = if set {
- previous | libc::FD_CLOEXEC
- } else {
- previous & !libc::FD_CLOEXEC
- };
- if new != previous {
- cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
- }
- Ok(())
- }
-}
-
-fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
- let status_flag = if set { libc::O_NONBLOCK } else { 0 };
-
- unsafe {
- cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
- }
-
- Ok(())
-}
-
-fn cvt(t: c_int) -> io::Result<c_int> {
- if t == -1 {
- Err(io::Error::last_os_error())
- } else {
- Ok(t)
- }
-}
-
-extern "C" fn sigusr1_handler(
- _signum: c_int,
- _info: *mut libc::siginfo_t,
- _ptr: *mut libc::c_void,
-) {
- // nothing to do
-}
+use libc::c_int;
+
+use crate::FromEnvErrorInner;
+use std::fs::{File, OpenOptions};
+use std::io::{self, Read, Write};
+use std::mem;
+use std::mem::MaybeUninit;
+use std::os::unix::prelude::*;
+use std::path::{Path, PathBuf};
+use std::process::Command;
+use std::ptr;
+use std::sync::{Arc, Once};
+use std::thread::{self, Builder, JoinHandle};
+use std::time::Duration;
+
+#[derive(Debug)]
+pub enum Client {
+ /// `--jobserver-auth=R,W`
+ Pipe { read: File, write: File },
+ /// `--jobserver-auth=fifo:PATH`
+ Fifo { file: File, path: PathBuf },
+}
+
+#[derive(Debug)]
+pub struct Acquired {
+ byte: u8,
+}
+
+impl Client {
+ pub fn new(mut limit: usize) -> io::Result<Client> {
+ let client = unsafe { Client::mk()? };
+
+ // I don't think the character written here matters, but I could be
+ // wrong!
+ const BUFFER: [u8; 128] = [b'|'; 128];
+
+ let mut write = client.write();
+
+ set_nonblocking(write.as_raw_fd(), true)?;
+
+ while limit > 0 {
+ let n = limit.min(BUFFER.len());
+
+ write.write_all(&BUFFER[..n])?;
+ limit -= n;
+ }
+
+ set_nonblocking(write.as_raw_fd(), false)?;
+
+ Ok(client)
+ }
+
+ unsafe fn mk() -> io::Result<Client> {
+ let mut pipes = [0; 2];
+
+ // Attempt atomically-create-with-cloexec if we can on Linux,
+ // detected by using the `syscall` function in `libc` to try to work
+ // with as many kernels/glibc implementations as possible.
+ #[cfg(target_os = "linux")]
+ {
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
+ if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
+ match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
+ -1 => {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(libc::ENOSYS) {
+ PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
+ } else {
+ return Err(err);
+ }
+ }
+ _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
+ }
+ }
+ }
+
+ cvt(libc::pipe(pipes.as_mut_ptr()))?;
+ drop(set_cloexec(pipes[0], true));
+ drop(set_cloexec(pipes[1], true));
+ Ok(Client::from_fds(pipes[0], pipes[1]))
+ }
+
+ pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
+ if let Some(client) = Self::from_fifo(s)? {
+ return Ok(client);
+ }
+ if let Some(client) = Self::from_pipe(s, check_pipe)? {
+ return Ok(client);
+ }
+ Err(FromEnvErrorInner::CannotParse(format!(
+ "expected `fifo:PATH` or `R,W`, found `{s}`"
+ )))
+ }
+
+ /// `--jobserver-auth=fifo:PATH`
+ fn from_fifo(s: &str) -> Result<Option<Client>, FromEnvErrorInner> {
+ let mut parts = s.splitn(2, ':');
+ if parts.next().unwrap() != "fifo" {
+ return Ok(None);
+ }
+ let path_str = parts.next().ok_or_else(|| {
+ FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
+ })?;
+ let path = Path::new(path_str);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .open(path)
+ .map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))?;
+ Ok(Some(Client::Fifo {
+ file,
+ path: path.into(),
+ }))
+ }
+
+ /// `--jobserver-auth=R,W`
+ unsafe fn from_pipe(s: &str, check_pipe: bool) -> Result<Option<Client>, FromEnvErrorInner> {
+ let mut parts = s.splitn(2, ',');
+ let read = parts.next().unwrap();
+ let write = match parts.next() {
+ Some(w) => w,
+ None => return Ok(None),
+ };
+ let read = read
+ .parse()
+ .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `read` fd: {e}")))?;
+ let write = write
+ .parse()
+ .map_err(|e| FromEnvErrorInner::CannotParse(format!("cannot parse `write` fd: {e}")))?;
+
+ // Ok so we've got two integers that look like file descriptors, but
+ // for extra sanity checking let's see if they actually look like
+ // valid files and instances of a pipe if feature enabled before we
+ // return the client.
+ //
+ // If we're called from `make` *without* the leading + on our rule
+ // then we'll have `MAKEFLAGS` env vars but won't actually have
+ // access to the file descriptors.
+ //
+ // `NotAPipe` is a worse error, return it if it's reported for any of the two fds.
+ match (fd_check(read, check_pipe), fd_check(write, check_pipe)) {
+ (read_err @ Err(FromEnvErrorInner::NotAPipe(..)), _) => read_err?,
+ (_, write_err @ Err(FromEnvErrorInner::NotAPipe(..))) => write_err?,
+ (read_err, write_err) => {
+ read_err?;
+ write_err?;
+ }
+ }
+
+ drop(set_cloexec(read, true));
+ drop(set_cloexec(write, true));
+ Ok(Some(Client::from_fds(read, write)))
+ }
+
+ unsafe fn from_fds(read: c_int, write: c_int) -> Client {
+ Client::Pipe {
+ read: File::from_raw_fd(read),
+ write: File::from_raw_fd(write),
+ }
+ }
+
+ /// Gets the read end of our jobserver client.
+ fn read(&self) -> &File {
+ match self {
+ Client::Pipe { read, .. } => read,
+ Client::Fifo { file, .. } => file,
+ }
+ }
+
+ /// Gets the write end of our jobserver client.
+ fn write(&self) -> &File {
+ match self {
+ Client::Pipe { write, .. } => write,
+ Client::Fifo { file, .. } => file,
+ }
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ // Ignore interrupts and keep trying if that happens
+ loop {
+ if let Some(token) = self.acquire_allow_interrupts()? {
+ return Ok(token);
+ }
+ }
+ }
+
+ /// Block waiting for a token, returning `None` if we're interrupted with
+ /// EINTR.
+ fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
+ // We don't actually know if the file descriptor here is set in
+ // blocking or nonblocking mode. AFAIK all released versions of
+ // `make` use blocking fds for the jobserver, but the unreleased
+ // version of `make` doesn't. In the unreleased version jobserver
+ // fds are set to nonblocking and combined with `pselect`
+ // internally.
+ //
+ // Here we try to be compatible with both strategies. We optimistically
+ // try to read from the file descriptor which then may block, return
+ // a token or indicate that polling is needed.
+ // Blocking reads (if possible) allows the kernel to be more selective
+ // about which readers to wake up when a token is written to the pipe.
+ //
+ // We use `poll` here to block this thread waiting for read
+ // readiness, and then afterwards we perform the `read` itself. If
+ // the `read` returns that it would block then we start over and try
+ // again.
+ //
+ // Also note that we explicitly don't handle EINTR here. That's used
+ // to shut us down, so we otherwise punt all errors upwards.
+ unsafe {
+ let mut fd: libc::pollfd = mem::zeroed();
+ let mut read = self.read();
+ fd.fd = read.as_raw_fd();
+ fd.events = libc::POLLIN;
+ loop {
+ let mut buf = [0];
+ match read.read(&mut buf) {
+ Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
+ Ok(_) => {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "early EOF on jobserver pipe",
+ ));
+ }
+ Err(e) => match e.kind() {
+ io::ErrorKind::WouldBlock => { /* fall through to polling */ }
+ io::ErrorKind::Interrupted => return Ok(None),
+ _ => return Err(e),
+ },
+ }
+
+ loop {
+ fd.revents = 0;
+ if libc::poll(&mut fd, 1, -1) == -1 {
+ let e = io::Error::last_os_error();
+ return match e.kind() {
+ io::ErrorKind::Interrupted => Ok(None),
+ _ => Err(e),
+ };
+ }
+ if fd.revents != 0 {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
+ // Note that the fd may be nonblocking but we're going to go ahead
+ // and assume that the writes here are always nonblocking (we can
+ // always quickly release a token). If that turns out to not be the
+ // case we'll get an error anyway!
+ let byte = data.map(|d| d.byte).unwrap_or(b'+');
+ match self.write().write(&[byte])? {
+ 1 => Ok(()),
+ _ => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to write token back to jobserver",
+ )),
+ }
+ }
+
+ pub fn string_arg(&self) -> String {
+ match self {
+ Client::Pipe { read, write } => format!("{},{}", read.as_raw_fd(), write.as_raw_fd()),
+ Client::Fifo { path, .. } => format!("fifo:{}", path.to_str().unwrap()),
+ }
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ let mut len = MaybeUninit::<c_int>::uninit();
+ cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
+ Ok(unsafe { len.assume_init() } as usize)
+ }
+
+ pub fn configure(&self, cmd: &mut Command) {
+ match self {
+ // We `File::open`ed it when inheriting from environment,
+ // so no need to set cloexec for fifo.
+ Client::Fifo { .. } => return,
+ Client::Pipe { .. } => {}
+ };
+ // Here we basically just want to say that in the child process
+ // we'll configure the read/write file descriptors to *not* be
+ // cloexec, so they're inherited across the exec and specified as
+ // integers through `string_arg` above.
+ let read = self.read().as_raw_fd();
+ let write = self.write().as_raw_fd();
+ unsafe {
+ cmd.pre_exec(move || {
+ set_cloexec(read, false)?;
+ set_cloexec(write, false)?;
+ Ok(())
+ });
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ thread: JoinHandle<()>,
+ state: Arc<super::HelperState>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ static USR1_INIT: Once = Once::new();
+ let mut err = None;
+ USR1_INIT.call_once(|| unsafe {
+ let mut new: libc::sigaction = mem::zeroed();
+ #[cfg(target_os = "aix")]
+ {
+ new.sa_union.__su_sigaction = sigusr1_handler;
+ }
+ #[cfg(not(target_os = "aix"))]
+ {
+ new.sa_sigaction = sigusr1_handler as usize;
+ }
+ new.sa_flags = libc::SA_SIGINFO as _;
+ if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
+ err = Some(io::Error::last_os_error());
+ }
+ });
+
+ if let Some(e) = err.take() {
+ return Err(e);
+ }
+
+ let state2 = state.clone();
+ let thread = Builder::new().spawn(move || {
+ state2.for_each_request(|helper| loop {
+ match client.inner.acquire_allow_interrupts() {
+ Ok(Some(data)) => {
+ break f(Ok(crate::Acquired {
+ client: client.inner.clone(),
+ data,
+ disabled: false,
+ }));
+ }
+ Err(e) => break f(Err(e)),
+ Ok(None) if helper.producer_done() => break,
+ Ok(None) => {}
+ }
+ });
+ })?;
+
+ Ok(Helper { thread, state })
+}
+
+impl Helper {
+ pub fn join(self) {
+ let dur = Duration::from_millis(10);
+ let mut state = self.state.lock();
+ debug_assert!(state.producer_done);
+
+ // We need to join our helper thread, and it could be blocked in one
+ // of two locations. First is the wait for a request, but the
+ // initial drop of `HelperState` will take care of that. Otherwise
+ // it may be blocked in `client.acquire()`. We actually have no way
+ // of interrupting that, so resort to `pthread_kill` as a fallback.
+ // This signal should interrupt any blocking `read` call with
+ // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
+ //
+ // Note that we don't do this forever though since there's a chance
+ // of bugs, so only do this opportunistically to make a best effort
+ // at clearing ourselves up.
+ for _ in 0..100 {
+ if state.consumer_done {
+ break;
+ }
+ unsafe {
+ // Ignore the return value here of `pthread_kill`,
+ // apparently on OSX if you kill a dead thread it will
+ // return an error, but on other platforms it may not. In
+ // that sense we don't actually know if this will succeed or
+ // not!
+ libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
+ }
+ state = self
+ .state
+ .cvar
+ .wait_timeout(state, dur)
+ .unwrap_or_else(|e| e.into_inner())
+ .0;
+ thread::yield_now(); // we really want the other thread to run
+ }
+
+ // If we managed to actually see the consumer get done, then we can
+ // definitely wait for the thread. Otherwise it's... off in the ether
+ // I guess?
+ if state.consumer_done {
+ drop(self.thread.join());
+ }
+ }
+}
+
+unsafe fn fcntl_check(fd: c_int) -> Result<(), FromEnvErrorInner> {
+ match libc::fcntl(fd, libc::F_GETFD) {
+ -1 => Err(FromEnvErrorInner::CannotOpenFd(
+ fd,
+ io::Error::last_os_error(),
+ )),
+ _ => Ok(()),
+ }
+}
+
+unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner> {
+ if check_pipe {
+ let mut stat = mem::zeroed();
+ if libc::fstat(fd, &mut stat) == -1 {
+ let last_os_error = io::Error::last_os_error();
+ fcntl_check(fd)?;
+ Err(FromEnvErrorInner::NotAPipe(fd, Some(last_os_error)))
+ } else {
+ // On android arm and i686 mode_t is u16 and st_mode is u32,
+ // this generates a type mismatch when S_IFIFO (declared as mode_t)
+ // is used in operations with st_mode, so we use this workaround
+ // to get the value of S_IFIFO with the same type of st_mode.
+ #[allow(unused_assignments)]
+ let mut s_ififo = stat.st_mode;
+ s_ififo = libc::S_IFIFO as _;
+ if stat.st_mode & s_ififo == s_ififo {
+ return Ok(());
+ }
+ Err(FromEnvErrorInner::NotAPipe(fd, None))
+ }
+ } else {
+ fcntl_check(fd)
+ }
+}
+
+fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
+ unsafe {
+ let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
+ let new = if set {
+ previous | libc::FD_CLOEXEC
+ } else {
+ previous & !libc::FD_CLOEXEC
+ };
+ if new != previous {
+ cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
+ }
+ Ok(())
+ }
+}
+
+fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
+ let status_flag = if set { libc::O_NONBLOCK } else { 0 };
+
+ unsafe {
+ cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
+ }
+
+ Ok(())
+}
+
+fn cvt(t: c_int) -> io::Result<c_int> {
+ if t == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(t)
+ }
+}
+
+extern "C" fn sigusr1_handler(
+ _signum: c_int,
+ _info: *mut libc::siginfo_t,
+ _ptr: *mut libc::c_void,
+) {
+ // nothing to do
+}
diff --git a/vendor/jobserver/src/wasm.rs b/vendor/jobserver/src/wasm.rs
index 3793bd67c..3dda675fd 100644
--- a/vendor/jobserver/src/wasm.rs
+++ b/vendor/jobserver/src/wasm.rs
@@ -1,95 +1,96 @@
-use std::io;
-use std::process::Command;
-use std::sync::{Arc, Condvar, Mutex};
-use std::thread::{Builder, JoinHandle};
-
-#[derive(Debug)]
-pub struct Client {
- inner: Arc<Inner>,
-}
-
-#[derive(Debug)]
-struct Inner {
- count: Mutex<usize>,
- cvar: Condvar,
-}
-
-#[derive(Debug)]
-pub struct Acquired(());
-
-impl Client {
- pub fn new(limit: usize) -> io::Result<Client> {
- Ok(Client {
- inner: Arc::new(Inner {
- count: Mutex::new(limit),
- cvar: Condvar::new(),
- }),
- })
- }
-
- pub unsafe fn open(_s: &str) -> Option<Client> {
- None
- }
-
- pub fn acquire(&self) -> io::Result<Acquired> {
- let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
- while *lock == 0 {
- lock = self
- .inner
- .cvar
- .wait(lock)
- .unwrap_or_else(|e| e.into_inner());
- }
- *lock -= 1;
- Ok(Acquired(()))
- }
-
- pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
- let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
- *lock += 1;
- drop(lock);
- self.inner.cvar.notify_one();
- Ok(())
- }
-
- pub fn string_arg(&self) -> String {
- panic!(
- "On this platform there is no cross process jobserver support,
- so Client::configure is not supported."
- );
- }
-
- pub fn available(&self) -> io::Result<usize> {
- let lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
- Ok(*lock)
- }
-
- pub fn configure(&self, _cmd: &mut Command) {
- unreachable!();
- }
-}
-
-#[derive(Debug)]
-pub struct Helper {
- thread: JoinHandle<()>,
-}
-
-pub(crate) fn spawn_helper(
- client: crate::Client,
- state: Arc<super::HelperState>,
- mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
-) -> io::Result<Helper> {
- let thread = Builder::new().spawn(move || {
- state.for_each_request(|_| f(client.acquire()));
- })?;
-
- Ok(Helper { thread: thread })
-}
-
-impl Helper {
- pub fn join(self) {
- // TODO: this is not correct if the thread is blocked in
- // `client.acquire()`.
- drop(self.thread.join());
- }
-}
+use crate::FromEnvErrorInner;
+use std::io;
+use std::process::Command;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{Builder, JoinHandle};
+
+#[derive(Debug)]
+pub struct Client {
+ inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ count: Mutex<usize>,
+ cvar: Condvar,
+}
+
+#[derive(Debug)]
+pub struct Acquired(());
+
+impl Client {
+ pub fn new(limit: usize) -> io::Result<Client> {
+ Ok(Client {
+ inner: Arc::new(Inner {
+ count: Mutex::new(limit),
+ cvar: Condvar::new(),
+ }),
+ })
+ }
+
+ pub(crate) unsafe fn open(_s: &str, _check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
+ Err(FromEnvErrorInner::Unsupported)
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ while *lock == 0 {
+ lock = self
+ .inner
+ .cvar
+ .wait(lock)
+ .unwrap_or_else(|e| e.into_inner());
+ }
+ *lock -= 1;
+ Ok(Acquired(()))
+ }
+
+ pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
+ let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ *lock += 1;
+ drop(lock);
+ self.inner.cvar.notify_one();
+ Ok(())
+ }
+
+ pub fn string_arg(&self) -> String {
+ panic!(
+ "On this platform there is no cross process jobserver support,
+ so Client::configure is not supported."
+ );
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ let lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ Ok(*lock)
+ }
+
+ pub fn configure(&self, _cmd: &mut Command) {
+ unreachable!();
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ thread: JoinHandle<()>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ let thread = Builder::new().spawn(move || {
+ state.for_each_request(|_| f(client.acquire()));
+ })?;
+
+ Ok(Helper { thread: thread })
+}
+
+impl Helper {
+ pub fn join(self) {
+ // TODO: this is not correct if the thread is blocked in
+ // `client.acquire()`.
+ drop(self.thread.join());
+ }
+}
diff --git a/vendor/jobserver/src/windows.rs b/vendor/jobserver/src/windows.rs
index 6791efea4..bff89c1b0 100644
--- a/vendor/jobserver/src/windows.rs
+++ b/vendor/jobserver/src/windows.rs
@@ -1,266 +1,270 @@
-use std::ffi::CString;
-use std::io;
-use std::process::Command;
-use std::ptr;
-use std::sync::Arc;
-use std::thread::{Builder, JoinHandle};
-
-#[derive(Debug)]
-pub struct Client {
- sem: Handle,
- name: String,
-}
-
-#[derive(Debug)]
-pub struct Acquired;
-
-type BOOL = i32;
-type DWORD = u32;
-type HANDLE = *mut u8;
-type LONG = i32;
-
-const ERROR_ALREADY_EXISTS: DWORD = 183;
-const FALSE: BOOL = 0;
-const INFINITE: DWORD = 0xffffffff;
-const SEMAPHORE_MODIFY_STATE: DWORD = 0x2;
-const SYNCHRONIZE: DWORD = 0x00100000;
-const TRUE: BOOL = 1;
-const WAIT_OBJECT_0: DWORD = 0;
-
-extern "system" {
- fn CloseHandle(handle: HANDLE) -> BOOL;
- fn SetEvent(hEvent: HANDLE) -> BOOL;
- fn WaitForMultipleObjects(
- ncount: DWORD,
- lpHandles: *const HANDLE,
- bWaitAll: BOOL,
- dwMilliseconds: DWORD,
- ) -> DWORD;
- fn CreateEventA(
- lpEventAttributes: *mut u8,
- bManualReset: BOOL,
- bInitialState: BOOL,
- lpName: *const i8,
- ) -> HANDLE;
- fn ReleaseSemaphore(
- hSemaphore: HANDLE,
- lReleaseCount: LONG,
- lpPreviousCount: *mut LONG,
- ) -> BOOL;
- fn CreateSemaphoreA(
- lpEventAttributes: *mut u8,
- lInitialCount: LONG,
- lMaximumCount: LONG,
- lpName: *const i8,
- ) -> HANDLE;
- fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE;
- fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
- #[link_name = "SystemFunction036"]
- fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8;
-}
-
-// Note that we ideally would use the `getrandom` crate, but unfortunately
-// that causes build issues when this crate is used in rust-lang/rust (see
-// rust-lang/rust#65014 for more information). As a result we just inline
-// the pretty simple Windows-specific implementation of generating
-// randomness.
-fn getrandom(dest: &mut [u8]) -> io::Result<()> {
- // Prevent overflow of u32
- for chunk in dest.chunks_mut(u32::max_value() as usize) {
- let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) };
- if ret == 0 {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to generate random bytes",
- ));
- }
- }
- Ok(())
-}
-
-impl Client {
- pub fn new(limit: usize) -> io::Result<Client> {
- // Try a bunch of random semaphore names until we get a unique one,
- // but don't try for too long.
- //
- // Note that `limit == 0` is a valid argument above but Windows
- // won't let us create a semaphore with 0 slots available to it. Get
- // `limit == 0` working by creating a semaphore instead with one
- // slot and then immediately acquire it (without ever releaseing it
- // back).
- for _ in 0..100 {
- let mut bytes = [0; 4];
- getrandom(&mut bytes)?;
- let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes));
- unsafe {
- let create_limit = if limit == 0 { 1 } else { limit };
- let r = CreateSemaphoreA(
- ptr::null_mut(),
- create_limit as LONG,
- create_limit as LONG,
- name.as_ptr() as *const _,
- );
- if r.is_null() {
- return Err(io::Error::last_os_error());
- }
- let handle = Handle(r);
-
- let err = io::Error::last_os_error();
- if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) {
- continue;
- }
- name.pop(); // chop off the trailing nul
- let client = Client {
- sem: handle,
- name: name,
- };
- if create_limit != limit {
- client.acquire()?;
- }
- return Ok(client);
- }
- }
-
- Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to find a unique name for a semaphore",
- ))
- }
-
- pub unsafe fn open(s: &str) -> Option<Client> {
- let name = match CString::new(s) {
- Ok(s) => s,
- Err(_) => return None,
- };
-
- let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr());
- if sem.is_null() {
- None
- } else {
- Some(Client {
- sem: Handle(sem),
- name: s.to_string(),
- })
- }
- }
-
- pub fn acquire(&self) -> io::Result<Acquired> {
- unsafe {
- let r = WaitForSingleObject(self.sem.0, INFINITE);
- if r == WAIT_OBJECT_0 {
- Ok(Acquired)
- } else {
- Err(io::Error::last_os_error())
- }
- }
- }
-
- pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
- unsafe {
- let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut());
- if r != 0 {
- Ok(())
- } else {
- Err(io::Error::last_os_error())
- }
- }
- }
-
- pub fn string_arg(&self) -> String {
- self.name.clone()
- }
-
- pub fn available(&self) -> io::Result<usize> {
- // Can't read value of a semaphore on Windows, so
- // try to acquire without sleeping, since we can find out the
- // old value on release. If acquisiton fails, then available is 0.
- unsafe {
- let r = WaitForSingleObject(self.sem.0, 0);
- if r != WAIT_OBJECT_0 {
- Ok(0)
- } else {
- let mut prev: LONG = 0;
- let r = ReleaseSemaphore(self.sem.0, 1, &mut prev);
- if r != 0 {
- Ok(prev as usize + 1)
- } else {
- Err(io::Error::last_os_error())
- }
- }
- }
- }
-
- pub fn configure(&self, _cmd: &mut Command) {
- // nothing to do here, we gave the name of our semaphore to the
- // child above
- }
-}
-
-#[derive(Debug)]
-struct Handle(HANDLE);
-// HANDLE is a raw ptr, but we're send/sync
-unsafe impl Sync for Handle {}
-unsafe impl Send for Handle {}
-
-impl Drop for Handle {
- fn drop(&mut self) {
- unsafe {
- CloseHandle(self.0);
- }
- }
-}
-
-#[derive(Debug)]
-pub struct Helper {
- event: Arc<Handle>,
- thread: JoinHandle<()>,
-}
-
-pub(crate) fn spawn_helper(
- client: crate::Client,
- state: Arc<super::HelperState>,
- mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
-) -> io::Result<Helper> {
- let event = unsafe {
- let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null());
- if r.is_null() {
- return Err(io::Error::last_os_error());
- } else {
- Handle(r)
- }
- };
- let event = Arc::new(event);
- let event2 = event.clone();
- let thread = Builder::new().spawn(move || {
- let objects = [event2.0, client.inner.sem.0];
- state.for_each_request(|_| {
- const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1;
- match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } {
- WAIT_OBJECT_0 => return,
- WAIT_OBJECT_1 => f(Ok(crate::Acquired {
- client: client.inner.clone(),
- data: Acquired,
- disabled: false,
- })),
- _ => f(Err(io::Error::last_os_error())),
- }
- });
- })?;
- Ok(Helper { thread, event })
-}
-
-impl Helper {
- pub fn join(self) {
- // Unlike unix this logic is much easier. If our thread was blocked
- // in waiting for requests it should already be woken up and
- // exiting. Otherwise it's waiting for a token, so we wake it up
- // with a different event that it's also waiting on here. After
- // these two we should be guaranteed the thread is on its way out,
- // so we can safely `join`.
- let r = unsafe { SetEvent(self.event.0) };
- if r == 0 {
- panic!("failed to set event: {}", io::Error::last_os_error());
- }
- drop(self.thread.join());
- }
-}
+use crate::FromEnvErrorInner;
+use std::ffi::CString;
+use std::io;
+use std::process::Command;
+use std::ptr;
+use std::sync::Arc;
+use std::thread::{Builder, JoinHandle};
+
+#[derive(Debug)]
+pub struct Client {
+ sem: Handle,
+ name: String,
+}
+
+#[derive(Debug)]
+pub struct Acquired;
+
+type BOOL = i32;
+type DWORD = u32;
+type HANDLE = *mut u8;
+type LONG = i32;
+
+const ERROR_ALREADY_EXISTS: DWORD = 183;
+const FALSE: BOOL = 0;
+const INFINITE: DWORD = 0xffffffff;
+const SEMAPHORE_MODIFY_STATE: DWORD = 0x2;
+const SYNCHRONIZE: DWORD = 0x00100000;
+const TRUE: BOOL = 1;
+const WAIT_OBJECT_0: DWORD = 0;
+
+extern "system" {
+ fn CloseHandle(handle: HANDLE) -> BOOL;
+ fn SetEvent(hEvent: HANDLE) -> BOOL;
+ fn WaitForMultipleObjects(
+ ncount: DWORD,
+ lpHandles: *const HANDLE,
+ bWaitAll: BOOL,
+ dwMilliseconds: DWORD,
+ ) -> DWORD;
+ fn CreateEventA(
+ lpEventAttributes: *mut u8,
+ bManualReset: BOOL,
+ bInitialState: BOOL,
+ lpName: *const i8,
+ ) -> HANDLE;
+ fn ReleaseSemaphore(
+ hSemaphore: HANDLE,
+ lReleaseCount: LONG,
+ lpPreviousCount: *mut LONG,
+ ) -> BOOL;
+ fn CreateSemaphoreA(
+ lpEventAttributes: *mut u8,
+ lInitialCount: LONG,
+ lMaximumCount: LONG,
+ lpName: *const i8,
+ ) -> HANDLE;
+ fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE;
+ fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
+ #[link_name = "SystemFunction036"]
+ fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8;
+}
+
+// Note that we ideally would use the `getrandom` crate, but unfortunately
+// that causes build issues when this crate is used in rust-lang/rust (see
+// rust-lang/rust#65014 for more information). As a result we just inline
+// the pretty simple Windows-specific implementation of generating
+// randomness.
+fn getrandom(dest: &mut [u8]) -> io::Result<()> {
+ // Prevent overflow of u32
+ for chunk in dest.chunks_mut(u32::max_value() as usize) {
+ let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) };
+ if ret == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to generate random bytes",
+ ));
+ }
+ }
+ Ok(())
+}
+
+impl Client {
+ pub fn new(limit: usize) -> io::Result<Client> {
+ // Try a bunch of random semaphore names until we get a unique one,
+ // but don't try for too long.
+ //
+ // Note that `limit == 0` is a valid argument above but Windows
+ // won't let us create a semaphore with 0 slots available to it. Get
+ // `limit == 0` working by creating a semaphore instead with one
+ // slot and then immediately acquire it (without ever releaseing it
+ // back).
+ for _ in 0..100 {
+ let mut bytes = [0; 4];
+ getrandom(&mut bytes)?;
+ let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes));
+ unsafe {
+ let create_limit = if limit == 0 { 1 } else { limit };
+ let r = CreateSemaphoreA(
+ ptr::null_mut(),
+ create_limit as LONG,
+ create_limit as LONG,
+ name.as_ptr() as *const _,
+ );
+ if r.is_null() {
+ return Err(io::Error::last_os_error());
+ }
+ let handle = Handle(r);
+
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) {
+ continue;
+ }
+ name.pop(); // chop off the trailing nul
+ let client = Client {
+ sem: handle,
+ name: name,
+ };
+ if create_limit != limit {
+ client.acquire()?;
+ }
+ return Ok(client);
+ }
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to find a unique name for a semaphore",
+ ))
+ }
+
+ pub(crate) unsafe fn open(s: &str, _check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
+ let name = match CString::new(s) {
+ Ok(s) => s,
+ Err(e) => return Err(FromEnvErrorInner::CannotParse(e.to_string())),
+ };
+
+ let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr());
+ if sem.is_null() {
+ Err(FromEnvErrorInner::CannotOpenPath(
+ s.to_string(),
+ io::Error::last_os_error(),
+ ))
+ } else {
+ Ok(Client {
+ sem: Handle(sem),
+ name: s.to_string(),
+ })
+ }
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ unsafe {
+ let r = WaitForSingleObject(self.sem.0, INFINITE);
+ if r == WAIT_OBJECT_0 {
+ Ok(Acquired)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+
+ pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
+ unsafe {
+ let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut());
+ if r != 0 {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+
+ pub fn string_arg(&self) -> String {
+ self.name.clone()
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ // Can't read value of a semaphore on Windows, so
+ // try to acquire without sleeping, since we can find out the
+ // old value on release. If acquisiton fails, then available is 0.
+ unsafe {
+ let r = WaitForSingleObject(self.sem.0, 0);
+ if r != WAIT_OBJECT_0 {
+ Ok(0)
+ } else {
+ let mut prev: LONG = 0;
+ let r = ReleaseSemaphore(self.sem.0, 1, &mut prev);
+ if r != 0 {
+ Ok(prev as usize + 1)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+ }
+
+ pub fn configure(&self, _cmd: &mut Command) {
+ // nothing to do here, we gave the name of our semaphore to the
+ // child above
+ }
+}
+
+#[derive(Debug)]
+struct Handle(HANDLE);
+// HANDLE is a raw ptr, but we're send/sync
+unsafe impl Sync for Handle {}
+unsafe impl Send for Handle {}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe {
+ CloseHandle(self.0);
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ event: Arc<Handle>,
+ thread: JoinHandle<()>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ let event = unsafe {
+ let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null());
+ if r.is_null() {
+ return Err(io::Error::last_os_error());
+ } else {
+ Handle(r)
+ }
+ };
+ let event = Arc::new(event);
+ let event2 = event.clone();
+ let thread = Builder::new().spawn(move || {
+ let objects = [event2.0, client.inner.sem.0];
+ state.for_each_request(|_| {
+ const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1;
+ match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } {
+ WAIT_OBJECT_0 => return,
+ WAIT_OBJECT_1 => f(Ok(crate::Acquired {
+ client: client.inner.clone(),
+ data: Acquired,
+ disabled: false,
+ })),
+ _ => f(Err(io::Error::last_os_error())),
+ }
+ });
+ })?;
+ Ok(Helper { thread, event })
+}
+
+impl Helper {
+ pub fn join(self) {
+ // Unlike unix this logic is much easier. If our thread was blocked
+ // in waiting for requests it should already be woken up and
+ // exiting. Otherwise it's waiting for a token, so we wake it up
+ // with a different event that it's also waiting on here. After
+ // these two we should be guaranteed the thread is on its way out,
+ // so we can safely `join`.
+ let r = unsafe { SetEvent(self.event.0) };
+ if r == 0 {
+ panic!("failed to set event: {}", io::Error::last_os_error());
+ }
+ drop(self.thread.join());
+ }
+}