summaryrefslogtreecommitdiffstats
path: root/third_party/rust/cc/src/parallel/job_token.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/cc/src/parallel/job_token.rs')
-rw-r--r--third_party/rust/cc/src/parallel/job_token.rs255
1 files changed, 255 insertions, 0 deletions
diff --git a/third_party/rust/cc/src/parallel/job_token.rs b/third_party/rust/cc/src/parallel/job_token.rs
new file mode 100644
index 0000000000..4fec982f85
--- /dev/null
+++ b/third_party/rust/cc/src/parallel/job_token.rs
@@ -0,0 +1,255 @@
+use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
+
+use crate::Error;
+
+pub(crate) struct JobToken(PhantomData<()>);
+
+impl JobToken {
+ fn new() -> Self {
+ Self(PhantomData)
+ }
+}
+
+impl Drop for JobToken {
+ fn drop(&mut self) {
+ match JobTokenServer::new() {
+ JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(),
+ JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(),
+ }
+ }
+}
+
+enum JobTokenServer {
+ Inherited(inherited_jobserver::JobServer),
+ InProcess(inprocess_jobserver::JobServer),
+}
+
+impl JobTokenServer {
+ /// This function returns a static reference to the jobserver because
+ /// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
+ /// be closed by other jobserver users in the process) and better do it
+ /// at the start of the program.
+ /// - in case a jobserver cannot be created from env (e.g. it's not
+ /// present), we will create a global in-process only jobserver
+ /// that has to be static so that it will be shared by all cc
+ /// compilation.
+ fn new() -> &'static Self {
+ static INIT: Once = Once::new();
+ static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
+
+ unsafe {
+ INIT.call_once(|| {
+ let server = inherited_jobserver::JobServer::from_env()
+ .map(Self::Inherited)
+ .unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
+ JOBSERVER = MaybeUninit::new(server);
+ });
+ // TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
+ &*JOBSERVER.as_ptr()
+ }
+ }
+}
+
+pub(crate) enum ActiveJobTokenServer {
+ Inherited(inherited_jobserver::ActiveJobServer<'static>),
+ InProcess(&'static inprocess_jobserver::JobServer),
+}
+
+impl ActiveJobTokenServer {
+ pub(crate) fn new() -> Result<Self, Error> {
+ match JobTokenServer::new() {
+ JobTokenServer::Inherited(inherited_jobserver) => {
+ inherited_jobserver.enter_active().map(Self::Inherited)
+ }
+ JobTokenServer::InProcess(inprocess_jobserver) => {
+ Ok(Self::InProcess(inprocess_jobserver))
+ }
+ }
+ }
+
+ pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
+ match &self {
+ Self::Inherited(jobserver) => jobserver.acquire().await,
+ Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
+ }
+ }
+}
+
+mod inherited_jobserver {
+ use super::JobToken;
+
+ use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
+
+ use std::{
+ io, mem,
+ sync::{mpsc, Mutex, MutexGuard, PoisonError},
+ };
+
+ pub(super) struct JobServer {
+ /// Implicit token for this process which is obtained and will be
+ /// released in parent. Since JobTokens only give back what they got,
+ /// there should be at most one global implicit token in the wild.
+ ///
+ /// Since Rust does not execute any `Drop` for global variables,
+ /// we can't just put it back to jobserver and then re-acquire it at
+ /// the end of the process.
+ ///
+ /// Use `Mutex` to avoid race between acquire and release.
+ /// If an `AtomicBool` is used, then it's possible for:
+ /// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
+ /// set to `true`, continue to release it to jobserver
+ /// - `acquire` takes the global implicit token, set `global_implicit_token` to false
+ /// - `release_token_raw` now writes the token back into the jobserver, while
+ /// `global_implicit_token` is `false`
+ ///
+ /// If the program exits here, then cc effectively increases parallelism by one, which is
+ /// incorrect, hence we use a `Mutex` here.
+ global_implicit_token: Mutex<bool>,
+ inner: jobserver::Client,
+ }
+
+ impl JobServer {
+ pub(super) unsafe fn from_env() -> Option<Self> {
+ jobserver::Client::from_env().map(|inner| Self {
+ inner,
+ global_implicit_token: Mutex::new(true),
+ })
+ }
+
+ fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
+ self.global_implicit_token
+ .lock()
+ .unwrap_or_else(PoisonError::into_inner)
+ }
+
+ /// All tokens except for the global implicit token will be put back into the jobserver
+ /// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
+ /// global variables.
+ pub(super) fn release_token_raw(&self) {
+ let mut global_implicit_token = self.get_global_implicit_token();
+
+ if *global_implicit_token {
+ // There's already a global implicit token, so this token must
+ // be released back into jobserver.
+ //
+ // `release_raw` should not block
+ let _ = self.inner.release_raw();
+ } else {
+ *global_implicit_token = true;
+ }
+ }
+
+ pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
+ ActiveJobServer::new(self)
+ }
+ }
+
+ pub(crate) struct ActiveJobServer<'a> {
+ jobserver: &'a JobServer,
+ helper_thread: jobserver::HelperThread,
+ /// When rx is dropped, all the token stored within it will be dropped.
+ rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
+ }
+
+ impl<'a> ActiveJobServer<'a> {
+ fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
+ let (tx, rx) = mpsc::channel();
+
+ Ok(Self {
+ rx,
+ helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
+ let _ = tx.send(res);
+ })?,
+ jobserver,
+ })
+ }
+
+ pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
+ let mut has_requested_token = false;
+
+ loop {
+ // Fast path
+ if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
+ break Ok(JobToken::new());
+ }
+
+ // Cold path, no global implicit token, obtain one
+ match self.rx.try_recv() {
+ Ok(res) => {
+ let acquired = res?;
+ acquired.drop_without_releasing();
+ break Ok(JobToken::new());
+ }
+ Err(mpsc::TryRecvError::Disconnected) => {
+ break Err(Error::new(
+ ErrorKind::JobserverHelpThreadError,
+ "jobserver help thread has returned before ActiveJobServer is dropped",
+ ))
+ }
+ Err(mpsc::TryRecvError::Empty) => {
+ if !has_requested_token {
+ self.helper_thread.request_token();
+ has_requested_token = true;
+ }
+ YieldOnce::default().await
+ }
+ }
+ }
+ }
+ }
+}
+
+mod inprocess_jobserver {
+ use super::JobToken;
+
+ use crate::parallel::async_executor::YieldOnce;
+
+ use std::{
+ env::var,
+ sync::atomic::{
+ AtomicU32,
+ Ordering::{AcqRel, Acquire},
+ },
+ };
+
+ pub(crate) struct JobServer(AtomicU32);
+
+ impl JobServer {
+ pub(super) fn new() -> Self {
+ // Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
+ // just fall back to a semi-reasonable number.
+ //
+ // Note that we could use `num_cpus` here but it's an extra
+ // dependency that will almost never be used, so
+ // it's generally not too worth it.
+ let mut parallelism = 4;
+ // TODO: Use std::thread::available_parallelism as an upper bound
+ // when MSRV is bumped.
+ if let Ok(amt) = var("NUM_JOBS") {
+ if let Ok(amt) = amt.parse() {
+ parallelism = amt;
+ }
+ }
+
+ Self(AtomicU32::new(parallelism))
+ }
+
+ pub(super) async fn acquire(&self) -> JobToken {
+ loop {
+ let res = self
+ .0
+ .fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
+
+ if res.is_ok() {
+ break JobToken::new();
+ }
+
+ YieldOnce::default().await
+ }
+ }
+
+ pub(super) fn release_token_raw(&self) {
+ self.0.fetch_add(1, AcqRel);
+ }
+ }
+}