diff options
Diffstat (limited to 'third_party/rust/cc/src/parallel/job_token.rs')
-rw-r--r-- | third_party/rust/cc/src/parallel/job_token.rs | 255 |
1 files changed, 255 insertions, 0 deletions
diff --git a/third_party/rust/cc/src/parallel/job_token.rs b/third_party/rust/cc/src/parallel/job_token.rs new file mode 100644 index 0000000000..4fec982f85 --- /dev/null +++ b/third_party/rust/cc/src/parallel/job_token.rs @@ -0,0 +1,255 @@ +use std::{marker::PhantomData, mem::MaybeUninit, sync::Once}; + +use crate::Error; + +pub(crate) struct JobToken(PhantomData<()>); + +impl JobToken { + fn new() -> Self { + Self(PhantomData) + } +} + +impl Drop for JobToken { + fn drop(&mut self) { + match JobTokenServer::new() { + JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(), + JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(), + } + } +} + +enum JobTokenServer { + Inherited(inherited_jobserver::JobServer), + InProcess(inprocess_jobserver::JobServer), +} + +impl JobTokenServer { + /// This function returns a static reference to the jobserver because + /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might + /// be closed by other jobserver users in the process) and better do it + /// at the start of the program. + /// - in case a jobserver cannot be created from env (e.g. it's not + /// present), we will create a global in-process only jobserver + /// that has to be static so that it will be shared by all cc + /// compilation. + fn new() -> &'static Self { + static INIT: Once = Once::new(); + static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit(); + + unsafe { + INIT.call_once(|| { + let server = inherited_jobserver::JobServer::from_env() + .map(Self::Inherited) + .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new())); + JOBSERVER = MaybeUninit::new(server); + }); + // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() + } + } +} + +pub(crate) enum ActiveJobTokenServer { + Inherited(inherited_jobserver::ActiveJobServer<'static>), + InProcess(&'static inprocess_jobserver::JobServer), +} + +impl ActiveJobTokenServer { + pub(crate) fn new() -> Result<Self, Error> { + match JobTokenServer::new() { + JobTokenServer::Inherited(inherited_jobserver) => { + inherited_jobserver.enter_active().map(Self::Inherited) + } + JobTokenServer::InProcess(inprocess_jobserver) => { + Ok(Self::InProcess(inprocess_jobserver)) + } + } + } + + pub(crate) async fn acquire(&self) -> Result<JobToken, Error> { + match &self { + Self::Inherited(jobserver) => jobserver.acquire().await, + Self::InProcess(jobserver) => Ok(jobserver.acquire().await), + } + } +} + +mod inherited_jobserver { + use super::JobToken; + + use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind}; + + use std::{ + io, mem, + sync::{mpsc, Mutex, MutexGuard, PoisonError}, + }; + + pub(super) struct JobServer { + /// Implicit token for this process which is obtained and will be + /// released in parent. Since JobTokens only give back what they got, + /// there should be at most one global implicit token in the wild. + /// + /// Since Rust does not execute any `Drop` for global variables, + /// we can't just put it back to jobserver and then re-acquire it at + /// the end of the process. + /// + /// Use `Mutex` to avoid race between acquire and release. + /// If an `AtomicBool` is used, then it's possible for: + /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already + /// set to `true`, continue to release it to jobserver + /// - `acquire` takes the global implicit token, set `global_implicit_token` to false + /// - `release_token_raw` now writes the token back into the jobserver, while + /// `global_implicit_token` is `false` + /// + /// If the program exits here, then cc effectively increases parallelism by one, which is + /// incorrect, hence we use a `Mutex` here. + global_implicit_token: Mutex<bool>, + inner: jobserver::Client, + } + + impl JobServer { + pub(super) unsafe fn from_env() -> Option<Self> { + jobserver::Client::from_env().map(|inner| Self { + inner, + global_implicit_token: Mutex::new(true), + }) + } + + fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> { + self.global_implicit_token + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + + /// All tokens except for the global implicit token will be put back into the jobserver + /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on + /// global variables. + pub(super) fn release_token_raw(&self) { + let mut global_implicit_token = self.get_global_implicit_token(); + + if *global_implicit_token { + // There's already a global implicit token, so this token must + // be released back into jobserver. + // + // `release_raw` should not block + let _ = self.inner.release_raw(); + } else { + *global_implicit_token = true; + } + } + + pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> { + ActiveJobServer::new(self) + } + } + + pub(crate) struct ActiveJobServer<'a> { + jobserver: &'a JobServer, + helper_thread: jobserver::HelperThread, + /// When rx is dropped, all the token stored within it will be dropped. + rx: mpsc::Receiver<io::Result<jobserver::Acquired>>, + } + + impl<'a> ActiveJobServer<'a> { + fn new(jobserver: &'a JobServer) -> Result<Self, Error> { + let (tx, rx) = mpsc::channel(); + + Ok(Self { + rx, + helper_thread: jobserver.inner.clone().into_helper_thread(move |res| { + let _ = tx.send(res); + })?, + jobserver, + }) + } + + pub(super) async fn acquire(&self) -> Result<JobToken, Error> { + let mut has_requested_token = false; + + loop { + // Fast path + if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) { + break Ok(JobToken::new()); + } + + // Cold path, no global implicit token, obtain one + match self.rx.try_recv() { + Ok(res) => { + let acquired = res?; + acquired.drop_without_releasing(); + break Ok(JobToken::new()); + } + Err(mpsc::TryRecvError::Disconnected) => { + break Err(Error::new( + ErrorKind::JobserverHelpThreadError, + "jobserver help thread has returned before ActiveJobServer is dropped", + )) + } + Err(mpsc::TryRecvError::Empty) => { + if !has_requested_token { + self.helper_thread.request_token(); + has_requested_token = true; + } + YieldOnce::default().await + } + } + } + } + } +} + +mod inprocess_jobserver { + use super::JobToken; + + use crate::parallel::async_executor::YieldOnce; + + use std::{ + env::var, + sync::atomic::{ + AtomicU32, + Ordering::{AcqRel, Acquire}, + }, + }; + + pub(crate) struct JobServer(AtomicU32); + + impl JobServer { + pub(super) fn new() -> Self { + // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise + // just fall back to a semi-reasonable number. + // + // Note that we could use `num_cpus` here but it's an extra + // dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + // TODO: Use std::thread::available_parallelism as an upper bound + // when MSRV is bumped. + if let Ok(amt) = var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + Self(AtomicU32::new(parallelism)) + } + + pub(super) async fn acquire(&self) -> JobToken { + loop { + let res = self + .0 + .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1)); + + if res.is_ok() { + break JobToken::new(); + } + + YieldOnce::default().await + } + } + + pub(super) fn release_token_raw(&self) { + self.0.fetch_add(1, AcqRel); + } + } +} |