summaryrefslogtreecommitdiffstats
path: root/third_party/rust/jobserver/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--third_party/rust/jobserver/src/lib.rs541
-rw-r--r--third_party/rust/jobserver/src/unix.rs368
-rw-r--r--third_party/rust/jobserver/src/wasm.rs95
-rw-r--r--third_party/rust/jobserver/src/windows.rs266
4 files changed, 1270 insertions, 0 deletions
diff --git a/third_party/rust/jobserver/src/lib.rs b/third_party/rust/jobserver/src/lib.rs
new file mode 100644
index 0000000000..6d07884b18
--- /dev/null
+++ b/third_party/rust/jobserver/src/lib.rs
@@ -0,0 +1,541 @@
+//! An implementation of the GNU make jobserver.
+//!
+//! This crate is an implementation, in Rust, of the GNU `make` jobserver for
+//! CLI tools that are interoperating with make or otherwise require some form
+//! of parallelism limiting across process boundaries. This was originally
+//! written for usage in Cargo to both (a) work when `cargo` is invoked from
+//! `make` (using `make`'s jobserver) and (b) work when `cargo` invokes build
+//! scripts, exporting a jobserver implementation for `make` processes to
+//! transitively use.
+//!
+//! The jobserver implementation can be found in [detail online][docs] but
+//! basically boils down to a cross-process semaphore. On Unix this is
+//! implemented with the `pipe` syscall and read/write ends of a pipe and on
+//! Windows this is implemented literally with IPC semaphores.
+//!
+//! The jobserver protocol in `make` also dictates when tokens are acquired to
+//! run child work, and clients using this crate should take care to implement
+//! such details to ensure correct interoperation with `make` itself.
+//!
+//! ## Examples
+//!
+//! Connect to a jobserver that was set up by `make` or a different process:
+//!
+//! ```no_run
+//! use jobserver::Client;
+//!
+//! // See API documentation for why this is `unsafe`
+//! let client = match unsafe { Client::from_env() } {
+//! Some(client) => client,
+//! None => panic!("client not configured"),
+//! };
+//! ```
+//!
+//! Acquire and release token from a jobserver:
+//!
+//! ```no_run
+//! use jobserver::Client;
+//!
+//! let client = unsafe { Client::from_env().unwrap() };
+//! let token = client.acquire().unwrap(); // blocks until it is available
+//! drop(token); // releases the token when the work is done
+//! ```
+//!
+//! Create a new jobserver and configure a child process to have access:
+//!
+//! ```
+//! use std::process::Command;
+//! use jobserver::Client;
+//!
+//! let client = Client::new(4).expect("failed to create jobserver");
+//! let mut cmd = Command::new("make");
+//! client.configure(&mut cmd);
+//! ```
+//!
+//! ## Caveats
+//!
+//! This crate makes no attempt to release tokens back to a jobserver on
+//! abnormal exit of a process. If a process which acquires a token is killed
+//! with ctrl-c or some similar signal then tokens will not be released and the
+//! jobserver may be in a corrupt state.
+//!
+//! Note that this is typically ok as ctrl-c means that an entire build process
+//! is being torn down, but it's worth being aware of at least!
+//!
+//! ## Windows caveats
+//!
+//! There appear to be two implementations of `make` on Windows. On MSYS2 one
+//! typically comes as `mingw32-make` and the other as `make` itself. I'm not
+//! personally too familiar with what's going on here, but for jobserver-related
+//! information the `mingw32-make` implementation uses Windows semaphores
+//! whereas the `make` program does not. The `make` program appears to use file
+//! descriptors and I'm not really sure how it works, so this crate is not
+//! compatible with `make` on Windows. It is, however, compatible with
+//! `mingw32-make`.
+//!
+//! [docs]: http://make.mad-scientist.net/papers/jobserver-implementation/
+
+#![deny(missing_docs, missing_debug_implementations)]
+#![doc(html_root_url = "https://docs.rs/jobserver/0.1")]
+
+use std::env;
+use std::io;
+use std::process::Command;
+use std::sync::{Arc, Condvar, Mutex, MutexGuard};
+
+#[cfg(unix)]
+#[path = "unix.rs"]
+mod imp;
+#[cfg(windows)]
+#[path = "windows.rs"]
+mod imp;
+#[cfg(not(any(unix, windows)))]
+#[path = "wasm.rs"]
+mod imp;
+
+/// A client of a jobserver
+///
+/// This structure is the main type exposed by this library, and is where
+/// interaction to a jobserver is configured through. Clients are either created
+/// from scratch in which case the internal semphore is initialied on the spot,
+/// or a client is created from the environment to connect to a jobserver
+/// already created.
+///
+/// Some usage examples can be found in the crate documentation for using a
+/// client.
+///
+/// Note that a `Client` implements the `Clone` trait, and all instances of a
+/// `Client` refer to the same jobserver instance.
+#[derive(Clone, Debug)]
+pub struct Client {
+ inner: Arc<imp::Client>,
+}
+
+/// An acquired token from a jobserver.
+///
+/// This token will be released back to the jobserver when it is dropped and
+/// otherwise represents the ability to spawn off another thread of work.
+#[derive(Debug)]
+pub struct Acquired {
+ client: Arc<imp::Client>,
+ data: imp::Acquired,
+ disabled: bool,
+}
+
+impl Acquired {
+ /// This drops the `Acquired` token without releasing the associated token.
+ ///
+ /// This is not generally useful, but can be helpful if you do not have the
+ /// ability to store an Acquired token but need to not yet release it.
+ ///
+ /// You'll typically want to follow this up with a call to `release_raw` or
+ /// similar to actually release the token later on.
+ pub fn drop_without_releasing(mut self) {
+ self.disabled = true;
+ }
+}
+
+#[derive(Default, Debug)]
+struct HelperState {
+ lock: Mutex<HelperInner>,
+ cvar: Condvar,
+}
+
+#[derive(Default, Debug)]
+struct HelperInner {
+ requests: usize,
+ producer_done: bool,
+ consumer_done: bool,
+}
+
+impl Client {
+ /// Creates a new jobserver initialized with the given parallelism limit.
+ ///
+ /// A client to the jobserver created will be returned. This client will
+ /// allow at most `limit` tokens to be acquired from it in parallel. More
+ /// calls to `acquire` will cause the calling thread to block.
+ ///
+ /// Note that the created `Client` is not automatically inherited into
+ /// spawned child processes from this program. Manual usage of the
+ /// `configure` function is required for a child process to have access to a
+ /// job server.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use jobserver::Client;
+ ///
+ /// let client = Client::new(4).expect("failed to create jobserver");
+ /// ```
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if any I/O error happens when attempting to create the
+ /// jobserver client.
+ pub fn new(limit: usize) -> io::Result<Client> {
+ Ok(Client {
+ inner: Arc::new(imp::Client::new(limit)?),
+ })
+ }
+
+ /// Attempts to connect to the jobserver specified in this process's
+ /// environment.
+ ///
+ /// When the a `make` executable calls a child process it will configure the
+ /// environment of the child to ensure that it has handles to the jobserver
+ /// it's passing down. This function will attempt to look for these details
+ /// and connect to the jobserver.
+ ///
+ /// Note that the created `Client` is not automatically inherited into
+ /// spawned child processes from this program. Manual usage of the
+ /// `configure` function is required for a child process to have access to a
+ /// job server.
+ ///
+ /// # Return value
+ ///
+ /// If a jobserver was found in the environment and it looks correct then
+ /// `Some` of the connected client will be returned. If no jobserver was
+ /// found then `None` will be returned.
+ ///
+ /// Note that on Unix the `Client` returned **takes ownership of the file
+ /// descriptors specified in the environment**. Jobservers on Unix are
+ /// implemented with `pipe` file descriptors, and they're inherited from
+ /// parent processes. This `Client` returned takes ownership of the file
+ /// descriptors for this process and will close the file descriptors after
+ /// this value is dropped.
+ ///
+ /// Additionally on Unix this function will configure the file descriptors
+ /// with `CLOEXEC` so they're not automatically inherited by spawned
+ /// children.
+ ///
+ /// # Safety
+ ///
+ /// This function is `unsafe` to call on Unix specifically as it
+ /// transitively requires usage of the `from_raw_fd` function, which is
+ /// itself unsafe in some circumstances.
+ ///
+ /// It's recommended to call this function very early in the lifetime of a
+ /// program before any other file descriptors are opened. That way you can
+ /// make sure to take ownership properly of the file descriptors passed
+ /// down, if any.
+ ///
+ /// It's generally unsafe to call this function twice in a program if the
+ /// previous invocation returned `Some`.
+ ///
+ /// Note, though, that on Windows it should be safe to call this function
+ /// any number of times.
+ pub unsafe fn from_env() -> Option<Client> {
+ let var = match env::var("CARGO_MAKEFLAGS")
+ .or_else(|_| env::var("MAKEFLAGS"))
+ .or_else(|_| env::var("MFLAGS"))
+ {
+ Ok(s) => s,
+ Err(_) => return None,
+ };
+ let mut arg = "--jobserver-fds=";
+ let pos = match var.find(arg) {
+ Some(i) => i,
+ None => {
+ arg = "--jobserver-auth=";
+ match var.find(arg) {
+ Some(i) => i,
+ None => return None,
+ }
+ }
+ };
+
+ let s = var[pos + arg.len()..].split(' ').next().unwrap();
+ imp::Client::open(s).map(|c| Client { inner: Arc::new(c) })
+ }
+
+ /// Acquires a token from this jobserver client.
+ ///
+ /// This function will block the calling thread until a new token can be
+ /// acquired from the jobserver.
+ ///
+ /// # Return value
+ ///
+ /// On successful acquisition of a token an instance of `Acquired` is
+ /// returned. This structure, when dropped, will release the token back to
+ /// the jobserver. It's recommended to avoid leaking this value.
+ ///
+ /// # Errors
+ ///
+ /// If an I/O error happens while acquiring a token then this function will
+ /// return immediately with the error. If an error is returned then a token
+ /// was not acquired.
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ let data = self.inner.acquire()?;
+ Ok(Acquired {
+ client: self.inner.clone(),
+ data,
+ disabled: false,
+ })
+ }
+
+ /// Returns amount of tokens in the read-side pipe.
+ ///
+ /// # Return value
+ ///
+ /// Number of bytes available to be read from the jobserver pipe
+ ///
+ /// # Errors
+ ///
+ /// Underlying errors from the ioctl will be passed up.
+ pub fn available(&self) -> io::Result<usize> {
+ self.inner.available()
+ }
+
+ /// Configures a child process to have access to this client's jobserver as
+ /// well.
+ ///
+ /// This function is required to be called to ensure that a jobserver is
+ /// properly inherited to a child process. If this function is *not* called
+ /// then this `Client` will not be accessible in the child process. In other
+ /// words, if not called, then `Client::from_env` will return `None` in the
+ /// child process (or the equivalent of `Child::from_env` that `make` uses).
+ ///
+ /// ## Platform-specific behavior
+ ///
+ /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS` environment
+ /// variables for the child process, and on Unix this will also allow the
+ /// two file descriptors for this client to be inherited to the child.
+ ///
+ /// On platforms other than Unix and Windows this panics.
+ pub fn configure(&self, cmd: &mut Command) {
+ cmd.env("CARGO_MAKEFLAGS", &self.mflags_env());
+ self.inner.configure(cmd);
+ }
+
+ /// Configures a child process to have access to this client's jobserver as
+ /// well.
+ ///
+ /// This function is required to be called to ensure that a jobserver is
+ /// properly inherited to a child process. If this function is *not* called
+ /// then this `Client` will not be accessible in the child process. In other
+ /// words, if not called, then `Client::from_env` will return `None` in the
+ /// child process (or the equivalent of `Child::from_env` that `make` uses).
+ ///
+ /// ## Platform-specific behavior
+ ///
+ /// On Unix and Windows this will clobber the `CARGO_MAKEFLAGS`,
+ /// `MAKEFLAGS` and `MFLAGS` environment variables for the child process,
+ /// and on Unix this will also allow the two file descriptors for
+ /// this client to be inherited to the child.
+ ///
+ /// On platforms other than Unix and Windows this panics.
+ pub fn configure_make(&self, cmd: &mut Command) {
+ let value = self.mflags_env();
+ cmd.env("CARGO_MAKEFLAGS", &value);
+ cmd.env("MAKEFLAGS", &value);
+ cmd.env("MFLAGS", &value);
+ self.inner.configure(cmd);
+ }
+
+ fn mflags_env(&self) -> String {
+ let arg = self.inner.string_arg();
+ // Older implementations of make use `--jobserver-fds` and newer
+ // implementations use `--jobserver-auth`, pass both to try to catch
+ // both implementations.
+ format!("-j --jobserver-fds={0} --jobserver-auth={0}", arg)
+ }
+
+ /// Converts this `Client` into a helper thread to deal with a blocking
+ /// `acquire` function a little more easily.
+ ///
+ /// The fact that the `acquire` function on `Client` blocks isn't always
+ /// the easiest to work with. Typically you're using a jobserver to
+ /// manage running other events in parallel! This means that you need to
+ /// either (a) wait for an existing job to finish or (b) wait for a
+ /// new token to become available.
+ ///
+ /// Unfortunately the blocking in `acquire` happens at the implementation
+ /// layer of jobservers. On Unix this requires a blocking call to `read`
+ /// and on Windows this requires one of the `WaitFor*` functions. Both
+ /// of these situations aren't the easiest to deal with:
+ ///
+ /// * On Unix there's basically only one way to wake up a `read` early, and
+ /// that's through a signal. This is what the `make` implementation
+ /// itself uses, relying on `SIGCHLD` to wake up a blocking acquisition
+ /// of a new job token. Unfortunately nonblocking I/O is not an option
+ /// here, so it means that "waiting for one of two events" means that
+ /// the latter event must generate a signal! This is not always the case
+ /// on unix for all jobservers.
+ ///
+ /// * On Windows you'd have to basically use the `WaitForMultipleObjects`
+ /// which means that you've got to canonicalize all your event sources
+ /// into a `HANDLE` which also isn't the easiest thing to do
+ /// unfortunately.
+ ///
+ /// This function essentially attempts to ease these limitations by
+ /// converting this `Client` into a helper thread spawned into this
+ /// process. The application can then request that the helper thread
+ /// acquires tokens and the provided closure will be invoked for each token
+ /// acquired.
+ ///
+ /// The intention is that this function can be used to translate the event
+ /// of a token acquisition into an arbitrary user-defined event.
+ ///
+ /// # Arguments
+ ///
+ /// This function will consume the `Client` provided to be transferred to
+ /// the helper thread that is spawned. Additionally a closure `f` is
+ /// provided to be invoked whenever a token is acquired.
+ ///
+ /// This closure is only invoked after calls to
+ /// `HelperThread::request_token` have been made and a token itself has
+ /// been acquired. If an error happens while acquiring the token then
+ /// an error will be yielded to the closure as well.
+ ///
+ /// # Return Value
+ ///
+ /// This function will return an instance of the `HelperThread` structure
+ /// which is used to manage the helper thread associated with this client.
+ /// Through the `HelperThread` you'll request that tokens are acquired.
+ /// When acquired, the closure provided here is invoked.
+ ///
+ /// When the `HelperThread` structure is returned it will be gracefully
+ /// torn down, and the calling thread will be blocked until the thread is
+ /// torn down (which should be prompt).
+ ///
+ /// # Errors
+ ///
+ /// This function may fail due to creation of the helper thread or
+ /// auxiliary I/O objects to manage the helper thread. In any of these
+ /// situations the error is propagated upwards.
+ ///
+ /// # Platform-specific behavior
+ ///
+ /// On Windows this function behaves pretty normally as expected, but on
+ /// Unix the implementation is... a little heinous. As mentioned above
+ /// we're forced into blocking I/O for token acquisition, namely a blocking
+ /// call to `read`. We must be able to unblock this, however, to tear down
+ /// the helper thread gracefully!
+ ///
+ /// Essentially what happens is that we'll send a signal to the helper
+ /// thread spawned and rely on `EINTR` being returned to wake up the helper
+ /// thread. This involves installing a global `SIGUSR1` handler that does
+ /// nothing along with sending signals to that thread. This may cause
+ /// odd behavior in some applications, so it's recommended to review and
+ /// test thoroughly before using this.
+ pub fn into_helper_thread<F>(self, f: F) -> io::Result<HelperThread>
+ where
+ F: FnMut(io::Result<Acquired>) + Send + 'static,
+ {
+ let state = Arc::new(HelperState::default());
+ Ok(HelperThread {
+ inner: Some(imp::spawn_helper(self, state.clone(), Box::new(f))?),
+ state,
+ })
+ }
+
+ /// Blocks the current thread until a token is acquired.
+ ///
+ /// This is the same as `acquire`, except that it doesn't return an RAII
+ /// helper. If successful the process will need to guarantee that
+ /// `release_raw` is called in the future.
+ pub fn acquire_raw(&self) -> io::Result<()> {
+ self.inner.acquire()?;
+ Ok(())
+ }
+
+ /// Releases a jobserver token back to the original jobserver.
+ ///
+ /// This is intended to be paired with `acquire_raw` if it was called, but
+ /// in some situations it could also be called to relinquish a process's
+ /// implicit token temporarily which is then re-acquired later.
+ pub fn release_raw(&self) -> io::Result<()> {
+ self.inner.release(None)?;
+ Ok(())
+ }
+}
+
+impl Drop for Acquired {
+ fn drop(&mut self) {
+ if !self.disabled {
+ drop(self.client.release(Some(&self.data)));
+ }
+ }
+}
+
+/// Structure returned from `Client::into_helper_thread` to manage the lifetime
+/// of the helper thread returned, see those associated docs for more info.
+#[derive(Debug)]
+pub struct HelperThread {
+ inner: Option<imp::Helper>,
+ state: Arc<HelperState>,
+}
+
+impl HelperThread {
+ /// Request that the helper thread acquires a token, eventually calling the
+ /// original closure with a token when it's available.
+ ///
+ /// For more information, see the docs on that function.
+ pub fn request_token(&self) {
+ // Indicate that there's one more request for a token and then wake up
+ // the helper thread if it's sleeping.
+ self.state.lock().requests += 1;
+ self.state.cvar.notify_one();
+ }
+}
+
+impl Drop for HelperThread {
+ fn drop(&mut self) {
+ // Flag that the producer half is done so the helper thread should exit
+ // quickly if it's waiting. Wake it up if it's actually waiting
+ self.state.lock().producer_done = true;
+ self.state.cvar.notify_one();
+
+ // ... and afterwards perform any thread cleanup logic
+ self.inner.take().unwrap().join();
+ }
+}
+
+impl HelperState {
+ fn lock(&self) -> MutexGuard<'_, HelperInner> {
+ self.lock.lock().unwrap_or_else(|e| e.into_inner())
+ }
+
+ /// Executes `f` for each request for a token, where `f` is expected to
+ /// block and then provide the original closure with a token once it's
+ /// acquired.
+ ///
+ /// This is an infinite loop until the helper thread is dropped, at which
+ /// point everything should get interrupted.
+ fn for_each_request(&self, mut f: impl FnMut(&HelperState)) {
+ let mut lock = self.lock();
+
+ // We only execute while we could receive requests, but as soon as
+ // that's `false` we're out of here.
+ while !lock.producer_done {
+ // If no one's requested a token then we wait for someone to
+ // request a token.
+ if lock.requests == 0 {
+ lock = self.cvar.wait(lock).unwrap_or_else(|e| e.into_inner());
+ continue;
+ }
+
+ // Consume the request for a token, and then actually acquire a
+ // token after unlocking our lock (not that acquisition happens in
+ // `f`). This ensures that we don't actually hold the lock if we
+ // wait for a long time for a token.
+ lock.requests -= 1;
+ drop(lock);
+ f(self);
+ lock = self.lock();
+ }
+ lock.consumer_done = true;
+ self.cvar.notify_one();
+ }
+
+ fn producer_done(&self) -> bool {
+ self.lock().producer_done
+ }
+}
+
+#[test]
+fn no_helper_deadlock() {
+ let x = crate::Client::new(32).unwrap();
+ let _y = x.clone();
+ std::mem::drop(x.into_helper_thread(|_| {}).unwrap());
+}
diff --git a/third_party/rust/jobserver/src/unix.rs b/third_party/rust/jobserver/src/unix.rs
new file mode 100644
index 0000000000..2474deefc5
--- /dev/null
+++ b/third_party/rust/jobserver/src/unix.rs
@@ -0,0 +1,368 @@
+use libc::c_int;
+
+use std::fs::File;
+use std::io::{self, Read, Write};
+use std::mem;
+use std::mem::MaybeUninit;
+use std::os::unix::prelude::*;
+use std::process::Command;
+use std::ptr;
+use std::sync::{Arc, Once};
+use std::thread::{self, Builder, JoinHandle};
+use std::time::Duration;
+
+#[derive(Debug)]
+pub struct Client {
+ read: File,
+ write: File,
+}
+
+#[derive(Debug)]
+pub struct Acquired {
+ byte: u8,
+}
+
+impl Client {
+ pub fn new(mut limit: usize) -> io::Result<Client> {
+ let client = unsafe { Client::mk()? };
+
+ // I don't think the character written here matters, but I could be
+ // wrong!
+ const BUFFER: [u8; 128] = [b'|'; 128];
+
+ set_nonblocking(client.write.as_raw_fd(), true)?;
+
+ while limit > 0 {
+ let n = limit.min(BUFFER.len());
+
+ (&client.write).write_all(&BUFFER[..n])?;
+ limit -= n;
+ }
+
+ set_nonblocking(client.write.as_raw_fd(), false)?;
+
+ Ok(client)
+ }
+
+ unsafe fn mk() -> io::Result<Client> {
+ let mut pipes = [0; 2];
+
+ // Attempt atomically-create-with-cloexec if we can on Linux,
+ // detected by using the `syscall` function in `libc` to try to work
+ // with as many kernels/glibc implementations as possible.
+ #[cfg(target_os = "linux")]
+ {
+ use std::sync::atomic::{AtomicBool, Ordering};
+
+ static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
+ if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
+ match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
+ -1 => {
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(libc::ENOSYS) {
+ PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
+ } else {
+ return Err(err);
+ }
+ }
+ _ => return Ok(Client::from_fds(pipes[0], pipes[1])),
+ }
+ }
+ }
+
+ cvt(libc::pipe(pipes.as_mut_ptr()))?;
+ drop(set_cloexec(pipes[0], true));
+ drop(set_cloexec(pipes[1], true));
+ Ok(Client::from_fds(pipes[0], pipes[1]))
+ }
+
+ pub unsafe fn open(s: &str) -> Option<Client> {
+ let mut parts = s.splitn(2, ',');
+ let read = parts.next().unwrap();
+ let write = match parts.next() {
+ Some(s) => s,
+ None => return None,
+ };
+
+ let read = match read.parse() {
+ Ok(n) => n,
+ Err(_) => return None,
+ };
+ let write = match write.parse() {
+ Ok(n) => n,
+ Err(_) => return None,
+ };
+
+ // Ok so we've got two integers that look like file descriptors, but
+ // for extra sanity checking let's see if they actually look like
+ // instances of a pipe before we return the client.
+ //
+ // If we're called from `make` *without* the leading + on our rule
+ // then we'll have `MAKEFLAGS` env vars but won't actually have
+ // access to the file descriptors.
+ if is_valid_fd(read) && is_valid_fd(write) {
+ drop(set_cloexec(read, true));
+ drop(set_cloexec(write, true));
+ Some(Client::from_fds(read, write))
+ } else {
+ None
+ }
+ }
+
+ unsafe fn from_fds(read: c_int, write: c_int) -> Client {
+ Client {
+ read: File::from_raw_fd(read),
+ write: File::from_raw_fd(write),
+ }
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ // Ignore interrupts and keep trying if that happens
+ loop {
+ if let Some(token) = self.acquire_allow_interrupts()? {
+ return Ok(token);
+ }
+ }
+ }
+
+ /// Block waiting for a token, returning `None` if we're interrupted with
+ /// EINTR.
+ fn acquire_allow_interrupts(&self) -> io::Result<Option<Acquired>> {
+ // We don't actually know if the file descriptor here is set in
+ // blocking or nonblocking mode. AFAIK all released versions of
+ // `make` use blocking fds for the jobserver, but the unreleased
+ // version of `make` doesn't. In the unreleased version jobserver
+ // fds are set to nonblocking and combined with `pselect`
+ // internally.
+ //
+ // Here we try to be compatible with both strategies. We optimistically
+ // try to read from the file descriptor which then may block, return
+ // a token or indicate that polling is needed.
+ // Blocking reads (if possible) allows the kernel to be more selective
+ // about which readers to wake up when a token is written to the pipe.
+ //
+ // We use `poll` here to block this thread waiting for read
+ // readiness, and then afterwards we perform the `read` itself. If
+ // the `read` returns that it would block then we start over and try
+ // again.
+ //
+ // Also note that we explicitly don't handle EINTR here. That's used
+ // to shut us down, so we otherwise punt all errors upwards.
+ unsafe {
+ let mut fd: libc::pollfd = mem::zeroed();
+ fd.fd = self.read.as_raw_fd();
+ fd.events = libc::POLLIN;
+ loop {
+ let mut buf = [0];
+ match (&self.read).read(&mut buf) {
+ Ok(1) => return Ok(Some(Acquired { byte: buf[0] })),
+ Ok(_) => {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "early EOF on jobserver pipe",
+ ))
+ }
+ Err(e) => match e.kind() {
+ io::ErrorKind::WouldBlock => { /* fall through to polling */ }
+ io::ErrorKind::Interrupted => return Ok(None),
+ _ => return Err(e),
+ },
+ }
+
+ loop {
+ fd.revents = 0;
+ if libc::poll(&mut fd, 1, -1) == -1 {
+ let e = io::Error::last_os_error();
+ return match e.kind() {
+ io::ErrorKind::Interrupted => Ok(None),
+ _ => Err(e),
+ };
+ }
+ if fd.revents != 0 {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ pub fn release(&self, data: Option<&Acquired>) -> io::Result<()> {
+ // Note that the fd may be nonblocking but we're going to go ahead
+ // and assume that the writes here are always nonblocking (we can
+ // always quickly release a token). If that turns out to not be the
+ // case we'll get an error anyway!
+ let byte = data.map(|d| d.byte).unwrap_or(b'+');
+ match (&self.write).write(&[byte])? {
+ 1 => Ok(()),
+ _ => Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to write token back to jobserver",
+ )),
+ }
+ }
+
+ pub fn string_arg(&self) -> String {
+ format!("{},{}", self.read.as_raw_fd(), self.write.as_raw_fd())
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ let mut len = MaybeUninit::<c_int>::uninit();
+ cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
+ Ok(unsafe { len.assume_init() } as usize)
+ }
+
+ pub fn configure(&self, cmd: &mut Command) {
+ // Here we basically just want to say that in the child process
+ // we'll configure the read/write file descriptors to *not* be
+ // cloexec, so they're inherited across the exec and specified as
+ // integers through `string_arg` above.
+ let read = self.read.as_raw_fd();
+ let write = self.write.as_raw_fd();
+ unsafe {
+ cmd.pre_exec(move || {
+ set_cloexec(read, false)?;
+ set_cloexec(write, false)?;
+ Ok(())
+ });
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ thread: JoinHandle<()>,
+ state: Arc<super::HelperState>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ static USR1_INIT: Once = Once::new();
+ let mut err = None;
+ USR1_INIT.call_once(|| unsafe {
+ let mut new: libc::sigaction = mem::zeroed();
+ new.sa_sigaction = sigusr1_handler as usize;
+ new.sa_flags = libc::SA_SIGINFO as _;
+ if libc::sigaction(libc::SIGUSR1, &new, ptr::null_mut()) != 0 {
+ err = Some(io::Error::last_os_error());
+ }
+ });
+
+ if let Some(e) = err.take() {
+ return Err(e);
+ }
+
+ let state2 = state.clone();
+ let thread = Builder::new().spawn(move || {
+ state2.for_each_request(|helper| loop {
+ match client.inner.acquire_allow_interrupts() {
+ Ok(Some(data)) => {
+ break f(Ok(crate::Acquired {
+ client: client.inner.clone(),
+ data,
+ disabled: false,
+ }))
+ }
+ Err(e) => break f(Err(e)),
+ Ok(None) if helper.producer_done() => break,
+ Ok(None) => {}
+ }
+ });
+ })?;
+
+ Ok(Helper { thread, state })
+}
+
+impl Helper {
+ pub fn join(self) {
+ let dur = Duration::from_millis(10);
+ let mut state = self.state.lock();
+ debug_assert!(state.producer_done);
+
+ // We need to join our helper thread, and it could be blocked in one
+ // of two locations. First is the wait for a request, but the
+ // initial drop of `HelperState` will take care of that. Otherwise
+ // it may be blocked in `client.acquire()`. We actually have no way
+ // of interrupting that, so resort to `pthread_kill` as a fallback.
+ // This signal should interrupt any blocking `read` call with
+ // `io::ErrorKind::Interrupt` and cause the thread to cleanly exit.
+ //
+ // Note that we don't do this forever though since there's a chance
+ // of bugs, so only do this opportunistically to make a best effort
+ // at clearing ourselves up.
+ for _ in 0..100 {
+ if state.consumer_done {
+ break;
+ }
+ unsafe {
+ // Ignore the return value here of `pthread_kill`,
+ // apparently on OSX if you kill a dead thread it will
+ // return an error, but on other platforms it may not. In
+ // that sense we don't actually know if this will succeed or
+ // not!
+ libc::pthread_kill(self.thread.as_pthread_t() as _, libc::SIGUSR1);
+ }
+ state = self
+ .state
+ .cvar
+ .wait_timeout(state, dur)
+ .unwrap_or_else(|e| e.into_inner())
+ .0;
+ thread::yield_now(); // we really want the other thread to run
+ }
+
+ // If we managed to actually see the consumer get done, then we can
+ // definitely wait for the thread. Otherwise it's... off in the ether
+ // I guess?
+ if state.consumer_done {
+ drop(self.thread.join());
+ }
+ }
+}
+
+fn is_valid_fd(fd: c_int) -> bool {
+ unsafe { libc::fcntl(fd, libc::F_GETFD) != -1 }
+}
+
+fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
+ unsafe {
+ let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
+ let new = if set {
+ previous | libc::FD_CLOEXEC
+ } else {
+ previous & !libc::FD_CLOEXEC
+ };
+ if new != previous {
+ cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
+ }
+ Ok(())
+ }
+}
+
+fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
+ let status_flag = if set { libc::O_NONBLOCK } else { 0 };
+
+ unsafe {
+ cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
+ }
+
+ Ok(())
+}
+
+fn cvt(t: c_int) -> io::Result<c_int> {
+ if t == -1 {
+ Err(io::Error::last_os_error())
+ } else {
+ Ok(t)
+ }
+}
+
+extern "C" fn sigusr1_handler(
+ _signum: c_int,
+ _info: *mut libc::siginfo_t,
+ _ptr: *mut libc::c_void,
+) {
+ // nothing to do
+}
diff --git a/third_party/rust/jobserver/src/wasm.rs b/third_party/rust/jobserver/src/wasm.rs
new file mode 100644
index 0000000000..3793bd67cc
--- /dev/null
+++ b/third_party/rust/jobserver/src/wasm.rs
@@ -0,0 +1,95 @@
+use std::io;
+use std::process::Command;
+use std::sync::{Arc, Condvar, Mutex};
+use std::thread::{Builder, JoinHandle};
+
+#[derive(Debug)]
+pub struct Client {
+ inner: Arc<Inner>,
+}
+
+#[derive(Debug)]
+struct Inner {
+ count: Mutex<usize>,
+ cvar: Condvar,
+}
+
+#[derive(Debug)]
+pub struct Acquired(());
+
+impl Client {
+ pub fn new(limit: usize) -> io::Result<Client> {
+ Ok(Client {
+ inner: Arc::new(Inner {
+ count: Mutex::new(limit),
+ cvar: Condvar::new(),
+ }),
+ })
+ }
+
+ pub unsafe fn open(_s: &str) -> Option<Client> {
+ None
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ while *lock == 0 {
+ lock = self
+ .inner
+ .cvar
+ .wait(lock)
+ .unwrap_or_else(|e| e.into_inner());
+ }
+ *lock -= 1;
+ Ok(Acquired(()))
+ }
+
+ pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
+ let mut lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ *lock += 1;
+ drop(lock);
+ self.inner.cvar.notify_one();
+ Ok(())
+ }
+
+ pub fn string_arg(&self) -> String {
+ panic!(
+ "On this platform there is no cross process jobserver support,
+ so Client::configure is not supported."
+ );
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ let lock = self.inner.count.lock().unwrap_or_else(|e| e.into_inner());
+ Ok(*lock)
+ }
+
+ pub fn configure(&self, _cmd: &mut Command) {
+ unreachable!();
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ thread: JoinHandle<()>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ let thread = Builder::new().spawn(move || {
+ state.for_each_request(|_| f(client.acquire()));
+ })?;
+
+ Ok(Helper { thread: thread })
+}
+
+impl Helper {
+ pub fn join(self) {
+ // TODO: this is not correct if the thread is blocked in
+ // `client.acquire()`.
+ drop(self.thread.join());
+ }
+}
diff --git a/third_party/rust/jobserver/src/windows.rs b/third_party/rust/jobserver/src/windows.rs
new file mode 100644
index 0000000000..6791efea4f
--- /dev/null
+++ b/third_party/rust/jobserver/src/windows.rs
@@ -0,0 +1,266 @@
+use std::ffi::CString;
+use std::io;
+use std::process::Command;
+use std::ptr;
+use std::sync::Arc;
+use std::thread::{Builder, JoinHandle};
+
+#[derive(Debug)]
+pub struct Client {
+ sem: Handle,
+ name: String,
+}
+
+#[derive(Debug)]
+pub struct Acquired;
+
+type BOOL = i32;
+type DWORD = u32;
+type HANDLE = *mut u8;
+type LONG = i32;
+
+const ERROR_ALREADY_EXISTS: DWORD = 183;
+const FALSE: BOOL = 0;
+const INFINITE: DWORD = 0xffffffff;
+const SEMAPHORE_MODIFY_STATE: DWORD = 0x2;
+const SYNCHRONIZE: DWORD = 0x00100000;
+const TRUE: BOOL = 1;
+const WAIT_OBJECT_0: DWORD = 0;
+
+extern "system" {
+ fn CloseHandle(handle: HANDLE) -> BOOL;
+ fn SetEvent(hEvent: HANDLE) -> BOOL;
+ fn WaitForMultipleObjects(
+ ncount: DWORD,
+ lpHandles: *const HANDLE,
+ bWaitAll: BOOL,
+ dwMilliseconds: DWORD,
+ ) -> DWORD;
+ fn CreateEventA(
+ lpEventAttributes: *mut u8,
+ bManualReset: BOOL,
+ bInitialState: BOOL,
+ lpName: *const i8,
+ ) -> HANDLE;
+ fn ReleaseSemaphore(
+ hSemaphore: HANDLE,
+ lReleaseCount: LONG,
+ lpPreviousCount: *mut LONG,
+ ) -> BOOL;
+ fn CreateSemaphoreA(
+ lpEventAttributes: *mut u8,
+ lInitialCount: LONG,
+ lMaximumCount: LONG,
+ lpName: *const i8,
+ ) -> HANDLE;
+ fn OpenSemaphoreA(dwDesiredAccess: DWORD, bInheritHandle: BOOL, lpName: *const i8) -> HANDLE;
+ fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
+ #[link_name = "SystemFunction036"]
+ fn RtlGenRandom(RandomBuffer: *mut u8, RandomBufferLength: u32) -> u8;
+}
+
+// Note that we ideally would use the `getrandom` crate, but unfortunately
+// that causes build issues when this crate is used in rust-lang/rust (see
+// rust-lang/rust#65014 for more information). As a result we just inline
+// the pretty simple Windows-specific implementation of generating
+// randomness.
+fn getrandom(dest: &mut [u8]) -> io::Result<()> {
+ // Prevent overflow of u32
+ for chunk in dest.chunks_mut(u32::max_value() as usize) {
+ let ret = unsafe { RtlGenRandom(chunk.as_mut_ptr(), chunk.len() as u32) };
+ if ret == 0 {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to generate random bytes",
+ ));
+ }
+ }
+ Ok(())
+}
+
+impl Client {
+ pub fn new(limit: usize) -> io::Result<Client> {
+ // Try a bunch of random semaphore names until we get a unique one,
+ // but don't try for too long.
+ //
+ // Note that `limit == 0` is a valid argument above but Windows
+ // won't let us create a semaphore with 0 slots available to it. Get
+ // `limit == 0` working by creating a semaphore instead with one
+ // slot and then immediately acquire it (without ever releaseing it
+ // back).
+ for _ in 0..100 {
+ let mut bytes = [0; 4];
+ getrandom(&mut bytes)?;
+ let mut name = format!("__rust_jobserver_semaphore_{}\0", u32::from_ne_bytes(bytes));
+ unsafe {
+ let create_limit = if limit == 0 { 1 } else { limit };
+ let r = CreateSemaphoreA(
+ ptr::null_mut(),
+ create_limit as LONG,
+ create_limit as LONG,
+ name.as_ptr() as *const _,
+ );
+ if r.is_null() {
+ return Err(io::Error::last_os_error());
+ }
+ let handle = Handle(r);
+
+ let err = io::Error::last_os_error();
+ if err.raw_os_error() == Some(ERROR_ALREADY_EXISTS as i32) {
+ continue;
+ }
+ name.pop(); // chop off the trailing nul
+ let client = Client {
+ sem: handle,
+ name: name,
+ };
+ if create_limit != limit {
+ client.acquire()?;
+ }
+ return Ok(client);
+ }
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to find a unique name for a semaphore",
+ ))
+ }
+
+ pub unsafe fn open(s: &str) -> Option<Client> {
+ let name = match CString::new(s) {
+ Ok(s) => s,
+ Err(_) => return None,
+ };
+
+ let sem = OpenSemaphoreA(SYNCHRONIZE | SEMAPHORE_MODIFY_STATE, FALSE, name.as_ptr());
+ if sem.is_null() {
+ None
+ } else {
+ Some(Client {
+ sem: Handle(sem),
+ name: s.to_string(),
+ })
+ }
+ }
+
+ pub fn acquire(&self) -> io::Result<Acquired> {
+ unsafe {
+ let r = WaitForSingleObject(self.sem.0, INFINITE);
+ if r == WAIT_OBJECT_0 {
+ Ok(Acquired)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+
+ pub fn release(&self, _data: Option<&Acquired>) -> io::Result<()> {
+ unsafe {
+ let r = ReleaseSemaphore(self.sem.0, 1, ptr::null_mut());
+ if r != 0 {
+ Ok(())
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+
+ pub fn string_arg(&self) -> String {
+ self.name.clone()
+ }
+
+ pub fn available(&self) -> io::Result<usize> {
+ // Can't read value of a semaphore on Windows, so
+ // try to acquire without sleeping, since we can find out the
+ // old value on release. If acquisiton fails, then available is 0.
+ unsafe {
+ let r = WaitForSingleObject(self.sem.0, 0);
+ if r != WAIT_OBJECT_0 {
+ Ok(0)
+ } else {
+ let mut prev: LONG = 0;
+ let r = ReleaseSemaphore(self.sem.0, 1, &mut prev);
+ if r != 0 {
+ Ok(prev as usize + 1)
+ } else {
+ Err(io::Error::last_os_error())
+ }
+ }
+ }
+ }
+
+ pub fn configure(&self, _cmd: &mut Command) {
+ // nothing to do here, we gave the name of our semaphore to the
+ // child above
+ }
+}
+
+#[derive(Debug)]
+struct Handle(HANDLE);
+// HANDLE is a raw ptr, but we're send/sync
+unsafe impl Sync for Handle {}
+unsafe impl Send for Handle {}
+
+impl Drop for Handle {
+ fn drop(&mut self) {
+ unsafe {
+ CloseHandle(self.0);
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct Helper {
+ event: Arc<Handle>,
+ thread: JoinHandle<()>,
+}
+
+pub(crate) fn spawn_helper(
+ client: crate::Client,
+ state: Arc<super::HelperState>,
+ mut f: Box<dyn FnMut(io::Result<crate::Acquired>) + Send>,
+) -> io::Result<Helper> {
+ let event = unsafe {
+ let r = CreateEventA(ptr::null_mut(), TRUE, FALSE, ptr::null());
+ if r.is_null() {
+ return Err(io::Error::last_os_error());
+ } else {
+ Handle(r)
+ }
+ };
+ let event = Arc::new(event);
+ let event2 = event.clone();
+ let thread = Builder::new().spawn(move || {
+ let objects = [event2.0, client.inner.sem.0];
+ state.for_each_request(|_| {
+ const WAIT_OBJECT_1: u32 = WAIT_OBJECT_0 + 1;
+ match unsafe { WaitForMultipleObjects(2, objects.as_ptr(), FALSE, INFINITE) } {
+ WAIT_OBJECT_0 => return,
+ WAIT_OBJECT_1 => f(Ok(crate::Acquired {
+ client: client.inner.clone(),
+ data: Acquired,
+ disabled: false,
+ })),
+ _ => f(Err(io::Error::last_os_error())),
+ }
+ });
+ })?;
+ Ok(Helper { thread, event })
+}
+
+impl Helper {
+ pub fn join(self) {
+ // Unlike unix this logic is much easier. If our thread was blocked
+ // in waiting for requests it should already be woken up and
+ // exiting. Otherwise it's waiting for a token, so we wake it up
+ // with a different event that it's also waiting on here. After
+ // these two we should be guaranteed the thread is on its way out,
+ // so we can safely `join`.
+ let r = unsafe { SetEvent(self.event.0) };
+ if r == 0 {
+ panic!("failed to set event: {}", io::Error::last_os_error());
+ }
+ drop(self.thread.join());
+ }
+}