summaryrefslogtreecommitdiffstats
path: root/third_party/rust/cc/src/parallel
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 01:14:29 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 01:14:29 +0000
commitfbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 (patch)
tree4c1ccaf5486d4f2009f9a338a98a83e886e29c97 /third_party/rust/cc/src/parallel
parentReleasing progress-linux version 124.0.1-1~progress7.99u1. (diff)
downloadfirefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.tar.xz
firefox-fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8.zip
Merging upstream version 125.0.1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/cc/src/parallel')
-rw-r--r--third_party/rust/cc/src/parallel/async_executor.rs118
-rw-r--r--third_party/rust/cc/src/parallel/job_token.rs255
-rw-r--r--third_party/rust/cc/src/parallel/mod.rs20
-rw-r--r--third_party/rust/cc/src/parallel/stderr.rs90
4 files changed, 483 insertions, 0 deletions
diff --git a/third_party/rust/cc/src/parallel/async_executor.rs b/third_party/rust/cc/src/parallel/async_executor.rs
new file mode 100644
index 0000000000..9ebd1ad562
--- /dev/null
+++ b/third_party/rust/cc/src/parallel/async_executor.rs
@@ -0,0 +1,118 @@
+use std::{
+ cell::Cell,
+ future::Future,
+ pin::Pin,
+ ptr,
+ task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
+ thread,
+ time::Duration,
+};
+
+use crate::Error;
+
+const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ // Cloning just returns a new no-op raw waker
+ |_| NOOP_RAW_WAKER,
+ // `wake` does nothing
+ |_| {},
+ // `wake_by_ref` does nothing
+ |_| {},
+ // Dropping does nothing as we don't allocate anything
+ |_| {},
+);
+const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE);
+
+#[derive(Default)]
+pub(crate) struct YieldOnce(bool);
+
+impl Future for YieldOnce {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
+ let flag = &mut std::pin::Pin::into_inner(self).0;
+ if !*flag {
+ *flag = true;
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ }
+}
+
+/// Execute the futures and return when they are all done.
+///
+/// Here we use our own homebrew async executor since cc is used in the build
+/// script of many popular projects, pulling in additional dependencies would
+/// significantly slow down its compilation.
+pub(crate) fn block_on<Fut1, Fut2>(
+ mut fut1: Fut1,
+ mut fut2: Fut2,
+ has_made_progress: &Cell<bool>,
+) -> Result<(), Error>
+where
+ Fut1: Future<Output = Result<(), Error>>,
+ Fut2: Future<Output = Result<(), Error>>,
+{
+ // Shadows the future so that it can never be moved and is guaranteed
+ // to be pinned.
+ //
+ // The same trick used in `pin!` macro.
+ //
+ // TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!`
+ let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) });
+ let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) });
+
+ // TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version
+ // which it is stablised, replace this with `Waker::noop`.
+ let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
+ let mut context = Context::from_waker(&waker);
+
+ let mut backoff_cnt = 0;
+
+ loop {
+ has_made_progress.set(false);
+
+ if let Some(fut) = fut2.as_mut() {
+ if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
+ fut2 = None;
+ res?;
+ }
+ }
+
+ if let Some(fut) = fut1.as_mut() {
+ if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
+ fut1 = None;
+ res?;
+ }
+ }
+
+ if fut1.is_none() && fut2.is_none() {
+ return Ok(());
+ }
+
+ if !has_made_progress.get() {
+ if backoff_cnt > 3 {
+ // We have yielded at least three times without making'
+ // any progress, so we will sleep for a while.
+ let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10));
+ thread::sleep(duration);
+ } else {
+ // Given that we spawned a lot of compilation tasks, it is unlikely
+ // that OS cannot find other ready task to execute.
+ //
+ // If all of them are done, then we will yield them and spawn more,
+ // or simply return.
+ //
+ // Thus this will not be turned into a busy-wait loop and it will not
+ // waste CPU resource.
+ thread::yield_now();
+ }
+ }
+
+ backoff_cnt = if has_made_progress.get() {
+ 0
+ } else {
+ backoff_cnt + 1
+ };
+ }
+}
diff --git a/third_party/rust/cc/src/parallel/job_token.rs b/third_party/rust/cc/src/parallel/job_token.rs
new file mode 100644
index 0000000000..4fec982f85
--- /dev/null
+++ b/third_party/rust/cc/src/parallel/job_token.rs
@@ -0,0 +1,255 @@
+use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
+
+use crate::Error;
+
+pub(crate) struct JobToken(PhantomData<()>);
+
+impl JobToken {
+ fn new() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl Drop for JobToken {
+ fn drop(&mut self) {
+ match JobTokenServer::new() {
+ JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(),
+ JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(),
+ }
+ }
+}
+
+enum JobTokenServer {
+ Inherited(inherited_jobserver::JobServer),
+ InProcess(inprocess_jobserver::JobServer),
+}
+
+impl JobTokenServer {
+ /// This function returns a static reference to the jobserver because
+ /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
+ /// be closed by other jobserver users in the process) and better do it
+ /// at the start of the program.
+ /// - in case a jobserver cannot be created from env (e.g. it's not
+ /// present), we will create a global in-process only jobserver
+ /// that has to be static so that it will be shared by all cc
+ /// compilation.
+ fn new() -> &'static Self {
+ static INIT: Once = Once::new();
+ static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
+
+ unsafe {
+ INIT.call_once(|| {
+ let server = inherited_jobserver::JobServer::from_env()
+ .map(Self::Inherited)
+ .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
+ JOBSERVER = MaybeUninit::new(server);
+ });
+ // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
+ &*JOBSERVER.as_ptr()
+ }
+ }
+}
+
+pub(crate) enum ActiveJobTokenServer {
+ Inherited(inherited_jobserver::ActiveJobServer<'static>),
+ InProcess(&'static inprocess_jobserver::JobServer),
+}
+
+impl ActiveJobTokenServer {
+ pub(crate) fn new() -> Result<Self, Error> {
+ match JobTokenServer::new() {
+ JobTokenServer::Inherited(inherited_jobserver) => {
+ inherited_jobserver.enter_active().map(Self::Inherited)
+ }
+ JobTokenServer::InProcess(inprocess_jobserver) => {
+ Ok(Self::InProcess(inprocess_jobserver))
+ }
+ }
+ }
+
+ pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
+ match &self {
+ Self::Inherited(jobserver) => jobserver.acquire().await,
+ Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
+ }
+ }
+}
+
+mod inherited_jobserver {
+ use super::JobToken;
+
+ use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
+
+ use std::{
+ io, mem,
+ sync::{mpsc, Mutex, MutexGuard, PoisonError},
+ };
+
+ pub(super) struct JobServer {
+ /// Implicit token for this process which is obtained and will be
+ /// released in parent. Since JobTokens only give back what they got,
+ /// there should be at most one global implicit token in the wild.
+ ///
+ /// Since Rust does not execute any `Drop` for global variables,
+ /// we can't just put it back to jobserver and then re-acquire it at
+ /// the end of the process.
+ ///
+ /// Use `Mutex` to avoid race between acquire and release.
+ /// If an `AtomicBool` is used, then it's possible for:
+ /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
+ /// set to `true`, continue to release it to jobserver
+ /// - `acquire` takes the global implicit token, set `global_implicit_token` to false
+ /// - `release_token_raw` now writes the token back into the jobserver, while
+ /// `global_implicit_token` is `false`
+ ///
+ /// If the program exits here, then cc effectively increases parallelism by one, which is
+ /// incorrect, hence we use a `Mutex` here.
+ global_implicit_token: Mutex<bool>,
+ inner: jobserver::Client,
+ }
+
+ impl JobServer {
+ pub(super) unsafe fn from_env() -> Option<Self> {
+ jobserver::Client::from_env().map(|inner| Self {
+ inner,
+ global_implicit_token: Mutex::new(true),
+ })
+ }
+
+ fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
+ self.global_implicit_token
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ }
+
+ /// All tokens except for the global implicit token will be put back into the jobserver
+ /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
+ /// global variables.
+ pub(super) fn release_token_raw(&self) {
+ let mut global_implicit_token = self.get_global_implicit_token();
+
+ if *global_implicit_token {
+ // There's already a global implicit token, so this token must
+ // be released back into jobserver.
+ //
+ // `release_raw` should not block
+ let _ = self.inner.release_raw();
+ } else {
+ *global_implicit_token = true;
+ }
+ }
+
+ pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
+ ActiveJobServer::new(self)
+ }
+ }
+
+ pub(crate) struct ActiveJobServer<'a> {
+ jobserver: &'a JobServer,
+ helper_thread: jobserver::HelperThread,
+ /// When rx is dropped, all the token stored within it will be dropped.
+ rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
+ }
+
+ impl<'a> ActiveJobServer<'a> {
+ fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
+ let (tx, rx) = mpsc::channel();
+
+ Ok(Self {
+ rx,
+ helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
+ let _ = tx.send(res);
+ })?,
+ jobserver,
+ })
+ }
+
+ pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
+ let mut has_requested_token = false;
+
+ loop {
+ // Fast path
+ if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
+ break Ok(JobToken::new());
+ }
+
+ // Cold path, no global implicit token, obtain one
+ match self.rx.try_recv() {
+ Ok(res) => {
+ let acquired = res?;
+ acquired.drop_without_releasing();
+ break Ok(JobToken::new());
+ }
+ Err(mpsc::TryRecvError::Disconnected) => {
+ break Err(Error::new(
+ ErrorKind::JobserverHelpThreadError,
+ "jobserver help thread has returned before ActiveJobServer is dropped",
+ ))
+ }
+ Err(mpsc::TryRecvError::Empty) => {
+ if !has_requested_token {
+ self.helper_thread.request_token();
+ has_requested_token = true;
+ }
+ YieldOnce::default().await
+ }
+ }
+ }
+ }
+ }
+}
+
+mod inprocess_jobserver {
+ use super::JobToken;
+
+ use crate::parallel::async_executor::YieldOnce;
+
+ use std::{
+ env::var,
+ sync::atomic::{
+ AtomicU32,
+ Ordering::{AcqRel, Acquire},
+ },
+ };
+
+ pub(crate) struct JobServer(AtomicU32);
+
+ impl JobServer {
+ pub(super) fn new() -> Self {
+ // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
+ // just fall back to a semi-reasonable number.
+ //
+ // Note that we could use `num_cpus` here but it's an extra
+ // dependency that will almost never be used, so
+ // it's generally not too worth it.
+ let mut parallelism = 4;
+ // TODO: Use std::thread::available_parallelism as an upper bound
+ // when MSRV is bumped.
+ if let Ok(amt) = var("NUM_JOBS") {
+ if let Ok(amt) = amt.parse() {
+ parallelism = amt;
+ }
+ }
+
+ Self(AtomicU32::new(parallelism))
+ }
+
+ pub(super) async fn acquire(&self) -> JobToken {
+ loop {
+ let res = self
+ .0
+ .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
+
+ if res.is_ok() {
+ break JobToken::new();
+ }
+
+ YieldOnce::default().await
+ }
+ }
+
+ pub(super) fn release_token_raw(&self) {
+ self.0.fetch_add(1, AcqRel);
+ }
+ }
+}
diff --git a/third_party/rust/cc/src/parallel/mod.rs b/third_party/rust/cc/src/parallel/mod.rs
new file mode 100644
index 0000000000..d69146dc59
--- /dev/null
+++ b/third_party/rust/cc/src/parallel/mod.rs
@@ -0,0 +1,20 @@
+pub(crate) mod async_executor;
+pub(crate) mod job_token;
+pub(crate) mod stderr;
+
+/// Remove all element in `vec` which `f(element)` returns `false`.
+///
+/// TODO: Remove this once the MSRV is bumped to v1.61
+pub(crate) fn retain_unordered_mut<T, F>(vec: &mut Vec<T>, mut f: F)
+where
+ F: FnMut(&mut T) -> bool,
+{
+ let mut i = 0;
+ while i < vec.len() {
+ if f(&mut vec[i]) {
+ i += 1;
+ } else {
+ vec.swap_remove(i);
+ }
+ }
+}
diff --git a/third_party/rust/cc/src/parallel/stderr.rs b/third_party/rust/cc/src/parallel/stderr.rs
new file mode 100644
index 0000000000..47fa085dba
--- /dev/null
+++ b/third_party/rust/cc/src/parallel/stderr.rs
@@ -0,0 +1,90 @@
+/// Helpers functions for [ChildStderr].
+use std::{convert::TryInto, process::ChildStderr};
+
+use crate::{Error, ErrorKind};
+
+#[cfg(all(not(unix), not(windows)))]
+compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");
+
+#[cfg(unix)]
+fn get_flags(fd: std::os::unix::io::RawFd) -> Result<i32, Error> {
+ let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
+ if flags == -1 {
+ Err(Error::new(
+ ErrorKind::IOError,
+ format!(
+ "Failed to get flags for pipe {}: {}",
+ fd,
+ std::io::Error::last_os_error()
+ ),
+ ))
+ } else {
+ Ok(flags)
+ }
+}
+
+#[cfg(unix)]
+fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> {
+ if unsafe { libc::fcntl(fd, libc::F_SETFL, flags) } == -1 {
+ Err(Error::new(
+ ErrorKind::IOError,
+ format!(
+ "Failed to set flags for pipe {}: {}",
+ fd,
+ std::io::Error::last_os_error()
+ ),
+ ))
+ } else {
+ Ok(())
+ }
+}
+
+#[cfg(unix)]
+pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
+ // On Unix, switch the pipe to non-blocking mode.
+ // On Windows, we have a different way to be non-blocking.
+ let fd = pipe.as_raw_fd();
+
+ let flags = get_flags(fd)?;
+ set_flags(fd, flags | libc::O_NONBLOCK)
+}
+
+pub fn bytes_available(stderr: &mut ChildStderr) -> Result<usize, Error> {
+ let mut bytes_available = 0;
+ #[cfg(windows)]
+ {
+ use crate::windows::windows_sys::PeekNamedPipe;
+ use std::os::windows::io::AsRawHandle;
+ use std::ptr::null_mut;
+ if unsafe {
+ PeekNamedPipe(
+ stderr.as_raw_handle(),
+ null_mut(),
+ 0,
+ null_mut(),
+ &mut bytes_available,
+ null_mut(),
+ )
+ } == 0
+ {
+ return Err(Error::new(
+ ErrorKind::IOError,
+ format!(
+ "PeekNamedPipe failed with {}",
+ std::io::Error::last_os_error()
+ ),
+ ));
+ }
+ }
+ #[cfg(unix)]
+ {
+ use std::os::unix::io::AsRawFd;
+ if unsafe { libc::ioctl(stderr.as_raw_fd(), libc::FIONREAD, &mut bytes_available) } != 0 {
+ return Err(Error::new(
+ ErrorKind::IOError,
+ format!("ioctl failed with {}", std::io::Error::last_os_error()),
+ ));
+ }
+ }
+ Ok(bytes_available.try_into().unwrap())
+}