From fbaf0bb26397aa498eb9156f06d5a6fe34dd7dd8 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 03:14:29 +0200 Subject: Merging upstream version 125.0.1. Signed-off-by: Daniel Baumann --- third_party/rust/cc/src/parallel/async_executor.rs | 118 ++++++++++ third_party/rust/cc/src/parallel/job_token.rs | 255 +++++++++++++++++++++ third_party/rust/cc/src/parallel/mod.rs | 20 ++ third_party/rust/cc/src/parallel/stderr.rs | 90 ++++++++ 4 files changed, 483 insertions(+) create mode 100644 third_party/rust/cc/src/parallel/async_executor.rs create mode 100644 third_party/rust/cc/src/parallel/job_token.rs create mode 100644 third_party/rust/cc/src/parallel/mod.rs create mode 100644 third_party/rust/cc/src/parallel/stderr.rs (limited to 'third_party/rust/cc/src/parallel') diff --git a/third_party/rust/cc/src/parallel/async_executor.rs b/third_party/rust/cc/src/parallel/async_executor.rs new file mode 100644 index 0000000000..9ebd1ad562 --- /dev/null +++ b/third_party/rust/cc/src/parallel/async_executor.rs @@ -0,0 +1,118 @@ +use std::{ + cell::Cell, + future::Future, + pin::Pin, + ptr, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + thread, + time::Duration, +}; + +use crate::Error; + +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| NOOP_RAW_WAKER, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, +); +const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); + +#[derive(Default)] +pub(crate) struct YieldOnce(bool); + +impl Future for YieldOnce { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + let flag = &mut std::pin::Pin::into_inner(self).0; + if !*flag { + *flag = true; + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +/// Execute the futures and return when they are all done. +/// +/// Here we use our own homebrew async executor since cc is used in the build +/// script of many popular projects, pulling in additional dependencies would +/// significantly slow down its compilation. +pub(crate) fn block_on( + mut fut1: Fut1, + mut fut2: Fut2, + has_made_progress: &Cell, +) -> Result<(), Error> +where + Fut1: Future>, + Fut2: Future>, +{ + // Shadows the future so that it can never be moved and is guaranteed + // to be pinned. + // + // The same trick used in `pin!` macro. + // + // TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!` + let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) }); + let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) }); + + // TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version + // which it is stablised, replace this with `Waker::noop`. + let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; + let mut context = Context::from_waker(&waker); + + let mut backoff_cnt = 0; + + loop { + has_made_progress.set(false); + + if let Some(fut) = fut2.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut2 = None; + res?; + } + } + + if let Some(fut) = fut1.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut1 = None; + res?; + } + } + + if fut1.is_none() && fut2.is_none() { + return Ok(()); + } + + if !has_made_progress.get() { + if backoff_cnt > 3 { + // We have yielded at least three times without making' + // any progress, so we will sleep for a while. + let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10)); + thread::sleep(duration); + } else { + // Given that we spawned a lot of compilation tasks, it is unlikely + // that OS cannot find other ready task to execute. + // + // If all of them are done, then we will yield them and spawn more, + // or simply return. + // + // Thus this will not be turned into a busy-wait loop and it will not + // waste CPU resource. + thread::yield_now(); + } + } + + backoff_cnt = if has_made_progress.get() { + 0 + } else { + backoff_cnt + 1 + }; + } +} diff --git a/third_party/rust/cc/src/parallel/job_token.rs b/third_party/rust/cc/src/parallel/job_token.rs new file mode 100644 index 0000000000..4fec982f85 --- /dev/null +++ b/third_party/rust/cc/src/parallel/job_token.rs @@ -0,0 +1,255 @@ +use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; + +use crate::Error; + +pub(crate) struct JobToken(PhantomData<()>); + +impl JobToken { + fn new() -> Self { + Self(PhantomData) + } +} + +impl Drop for JobToken { + fn drop(&mut self) { + match JobTokenServer::new() { + JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), + JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), + } + } +} + +enum JobTokenServer { + Inherited(inherited_jobserver::JobServer), + InProcess(inprocess_jobserver::JobServer), +} + +impl JobTokenServer { + /// This function returns a static reference to the jobserver because + /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might + /// be closed by other jobserver users in the process) and better do it + /// at the start of the program. + /// - in case a jobserver cannot be created from env (e.g. it's not + /// present), we will create a global in-process only jobserver + /// that has to be static so that it will be shared by all cc + /// compilation. + fn new() -> &'static Self { + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); + + unsafe { + INIT.call_once(|| { + let server = inherited_jobserver::JobServer::from_env() + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); + JOBSERVER = MaybeUninit::new(server); + }); + // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } + } +} + +pub(crate) enum ActiveJobTokenServer { + Inherited(inherited_jobserver::ActiveJobServer<'static>), + InProcess(&'static inprocess_jobserver::JobServer), +} + +impl ActiveJobTokenServer { + pub(crate) fn new() -> Result { + match JobTokenServer::new() { + JobTokenServer::Inherited(inherited_jobserver) => { + inherited_jobserver.enter_active().map(Self::Inherited) + } + JobTokenServer::InProcess(inprocess_jobserver) => { + Ok(Self::InProcess(inprocess_jobserver)) + } + } + } + + pub(crate) async fn acquire(&self) -> Result { + match &self { + Self::Inherited(jobserver) => jobserver.acquire().await, + Self::InProcess(jobserver) => Ok(jobserver.acquire().await), + } + } +} + +mod inherited_jobserver { + use super::JobToken; + + use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; + + use std::{ + io, mem, + sync::{mpsc, Mutex, MutexGuard, PoisonError}, + }; + + pub(super) struct JobServer { + /// Implicit token for this process which is obtained and will be + /// released in parent. Since JobTokens only give back what they got, + /// there should be at most one global implicit token in the wild. + /// + /// Since Rust does not execute any `Drop` for global variables, + /// we can't just put it back to jobserver and then re-acquire it at + /// the end of the process. + /// + /// Use `Mutex` to avoid race between acquire and release. + /// If an `AtomicBool` is used, then it's possible for: + /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already + /// set to `true`, continue to release it to jobserver + /// - `acquire` takes the global implicit token, set `global_implicit_token` to false + /// - `release_token_raw` now writes the token back into the jobserver, while + /// `global_implicit_token` is `false` + /// + /// If the program exits here, then cc effectively increases parallelism by one, which is + /// incorrect, hence we use a `Mutex` here. + global_implicit_token: Mutex, + inner: jobserver::Client, + } + + impl JobServer { + pub(super) unsafe fn from_env() -> Option { + jobserver::Client::from_env().map(|inner| Self { + inner, + global_implicit_token: Mutex::new(true), + }) + } + + fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { + self.global_implicit_token + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + + /// All tokens except for the global implicit token will be put back into the jobserver + /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on + /// global variables. + pub(super) fn release_token_raw(&self) { + let mut global_implicit_token = self.get_global_implicit_token(); + + if *global_implicit_token { + // There's already a global implicit token, so this token must + // be released back into jobserver. + // + // `release_raw` should not block + let _ = self.inner.release_raw(); + } else { + *global_implicit_token = true; + } + } + + pub(super) fn enter_active(&self) -> Result, Error> { + ActiveJobServer::new(self) + } + } + + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: jobserver::HelperThread, + /// When rx is dropped, all the token stored within it will be dropped. + rx: mpsc::Receiver>, + } + + impl<'a> ActiveJobServer<'a> { + fn new(jobserver: &'a JobServer) -> Result { + let (tx, rx) = mpsc::channel(); + + Ok(Self { + rx, + helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + let _ = tx.send(res); + })?, + jobserver, + }) + } + + pub(super) async fn acquire(&self) -> Result { + let mut has_requested_token = false; + + loop { + // Fast path + if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { + break Ok(JobToken::new()); + } + + // Cold path, no global implicit token, obtain one + match self.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken::new()); + } + Err(mpsc::TryRecvError::Disconnected) => { + break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )) + } + Err(mpsc::TryRecvError::Empty) => { + if !has_requested_token { + self.helper_thread.request_token(); + has_requested_token = true; + } + YieldOnce::default().await + } + } + } + } + } +} + +mod inprocess_jobserver { + use super::JobToken; + + use crate::parallel::async_executor::YieldOnce; + + use std::{ + env::var, + sync::atomic::{ + AtomicU32, + Ordering::{AcqRel, Acquire}, + }, + }; + + pub(crate) struct JobServer(AtomicU32); + + impl JobServer { + pub(super) fn new() -> Self { + // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise + // just fall back to a semi-reasonable number. + // + // Note that we could use `num_cpus` here but it's an extra + // dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + // TODO: Use std::thread::available_parallelism as an upper bound + // when MSRV is bumped. + if let Ok(amt) = var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + Self(AtomicU32::new(parallelism)) + } + + pub(super) async fn acquire(&self) -> JobToken { + loop { + let res = self + .0 + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); + + if res.is_ok() { + break JobToken::new(); + } + + YieldOnce::default().await + } + } + + pub(super) fn release_token_raw(&self) { + self.0.fetch_add(1, AcqRel); + } + } +} diff --git a/third_party/rust/cc/src/parallel/mod.rs b/third_party/rust/cc/src/parallel/mod.rs new file mode 100644 index 0000000000..d69146dc59 --- /dev/null +++ b/third_party/rust/cc/src/parallel/mod.rs @@ -0,0 +1,20 @@ +pub(crate) mod async_executor; +pub(crate) mod job_token; +pub(crate) mod stderr; + +/// Remove all element in `vec` which `f(element)` returns `false`. +/// +/// TODO: Remove this once the MSRV is bumped to v1.61 +pub(crate) fn retain_unordered_mut(vec: &mut Vec, mut f: F) +where + F: FnMut(&mut T) -> bool, +{ + let mut i = 0; + while i < vec.len() { + if f(&mut vec[i]) { + i += 1; + } else { + vec.swap_remove(i); + } + } +} diff --git a/third_party/rust/cc/src/parallel/stderr.rs b/third_party/rust/cc/src/parallel/stderr.rs new file mode 100644 index 0000000000..47fa085dba --- /dev/null +++ b/third_party/rust/cc/src/parallel/stderr.rs @@ -0,0 +1,90 @@ +/// Helpers functions for [ChildStderr]. +use std::{convert::TryInto, process::ChildStderr}; + +use crate::{Error, ErrorKind}; + +#[cfg(all(not(unix), not(windows)))] +compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature."); + +#[cfg(unix)] +fn get_flags(fd: std::os::unix::io::RawFd) -> Result { + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) }; + if flags == -1 { + Err(Error::new( + ErrorKind::IOError, + format!( + "Failed to get flags for pipe {}: {}", + fd, + std::io::Error::last_os_error() + ), + )) + } else { + Ok(flags) + } +} + +#[cfg(unix)] +fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> { + if unsafe { libc::fcntl(fd, libc::F_SETFL, flags) } == -1 { + Err(Error::new( + ErrorKind::IOError, + format!( + "Failed to set flags for pipe {}: {}", + fd, + std::io::Error::last_os_error() + ), + )) + } else { + Ok(()) + } +} + +#[cfg(unix)] +pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { + // On Unix, switch the pipe to non-blocking mode. + // On Windows, we have a different way to be non-blocking. + let fd = pipe.as_raw_fd(); + + let flags = get_flags(fd)?; + set_flags(fd, flags | libc::O_NONBLOCK) +} + +pub fn bytes_available(stderr: &mut ChildStderr) -> Result { + let mut bytes_available = 0; + #[cfg(windows)] + { + use crate::windows::windows_sys::PeekNamedPipe; + use std::os::windows::io::AsRawHandle; + use std::ptr::null_mut; + if unsafe { + PeekNamedPipe( + stderr.as_raw_handle(), + null_mut(), + 0, + null_mut(), + &mut bytes_available, + null_mut(), + ) + } == 0 + { + return Err(Error::new( + ErrorKind::IOError, + format!( + "PeekNamedPipe failed with {}", + std::io::Error::last_os_error() + ), + )); + } + } + #[cfg(unix)] + { + use std::os::unix::io::AsRawFd; + if unsafe { libc::ioctl(stderr.as_raw_fd(), libc::FIONREAD, &mut bytes_available) } != 0 { + return Err(Error::new( + ErrorKind::IOError, + format!("ioctl failed with {}", std::io::Error::last_os_error()), + )); + } + } + Ok(bytes_available.try_into().unwrap()) +} -- cgit v1.2.3