summaryrefslogtreecommitdiffstats
path: root/vendor/rayon-core/src/sleep
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
commit698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch)
tree173a775858bd501c378080a10dca74132f05bc50 /vendor/rayon-core/src/sleep
parentInitial commit. (diff)
downloadrustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz
rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/rayon-core/src/sleep')
-rw-r--r--vendor/rayon-core/src/sleep/README.md219
-rw-r--r--vendor/rayon-core/src/sleep/counters.rs275
-rw-r--r--vendor/rayon-core/src/sleep/mod.rs394
3 files changed, 888 insertions, 0 deletions
diff --git a/vendor/rayon-core/src/sleep/README.md b/vendor/rayon-core/src/sleep/README.md
new file mode 100644
index 000000000..c62c3975d
--- /dev/null
+++ b/vendor/rayon-core/src/sleep/README.md
@@ -0,0 +1,219 @@
+# Introduction: the sleep module
+
+The code in this module governs when worker threads should go to
+sleep. The system used in this code was introduced in [Rayon RFC #5].
+There is also a [video walkthrough] available. Both of those may be
+valuable resources to understanding the code, though naturally they
+will also grow stale over time. The comments in this file are
+extracted from the RFC and meant to be kept up to date.
+
+[Rayon RFC #5]: https://github.com/rayon-rs/rfcs/pull/5
+[video walkthrough]: https://youtu.be/HvmQsE5M4cY
+
+# The `Sleep` struct
+
+The `Sleep` struct is embedded into each registry. It performs several functions:
+
+* It tracks when workers are awake or asleep.
+* It decides how long a worker should look for work before it goes to sleep,
+ via a callback that is invoked periodically from the worker's search loop.
+* It is notified when latches are set, jobs are published, or other
+ events occur, and it will go and wake the appropriate threads if
+ they are sleeping.
+
+# Thread states
+
+There are three main thread states:
+
+* An **active** thread is one that is actively executing a job.
+* An **idle** thread is one that is searching for work to do. It will be
+ trying to steal work or pop work from the global injector queue.
+* A **sleeping** thread is one that is blocked on a condition variable,
+ waiting to be awoken.
+
+We sometimes refer to the final two states collectively as **inactive**.
+Threads begin as idle but transition to idle and finally sleeping when
+they're unable to find work to do.
+
+## Sleepy threads
+
+There is one other special state worth mentioning. During the idle state,
+threads can get **sleepy**. A sleepy thread is still idle, in that it is still
+searching for work, but it is *about* to go to sleep after it does one more
+search (or some other number, potentially). When a thread enters the sleepy
+state, it signals (via the **jobs event counter**, described below) that it is
+about to go to sleep. If new work is published, this will lead to the counter
+being adjusted. When the thread actually goes to sleep, it will (hopefully, but
+not guaranteed) see that the counter has changed and elect not to sleep, but
+instead to search again. See the section on the **jobs event counter** for more
+details.
+
+# The counters
+
+One of the key structs in the sleep module is `AtomicCounters`, found in
+`counters.rs`. It packs three counters into one atomically managed value:
+
+* Two **thread counters**, which track the number of threads in a particular state.
+* The **jobs event counter**, which is used to signal when new work is available.
+ It (sort of) tracks the number of jobs posted, but not quite, and it can rollover.
+
+## Thread counters
+
+There are two thread counters, one that tracks **inactive** threads and one that
+tracks **sleeping** threads. From this, one can deduce the number of threads
+that are idle by subtracting sleeping threads from inactive threads. We track
+the counters in this way because it permits simpler atomic operations. One can
+increment the number of sleeping threads (and thus decrease the number of idle
+threads) simply by doing one atomic increment, for example. Similarly, one can
+decrease the number of sleeping threads (and increase the number of idle
+threads) through one atomic decrement.
+
+These counters are adjusted as follows:
+
+* When a thread enters the idle state: increment the inactive thread counter.
+* When a thread enters the sleeping state: increment the sleeping thread counter.
+* When a thread awakens a sleeping thread: decrement the sleeping thread counter.
+ * Subtle point: the thread that *awakens* the sleeping thread decrements the
+ counter, not the thread that is *sleeping*. This is because there is a delay
+ between siganling a thread to wake and the thread actually waking:
+ decrementing the counter when awakening the thread means that other threads
+ that may be posting work will see the up-to-date value that much faster.
+* When a thread finds work, exiting the idle state: decrement the inactive
+ thread counter.
+
+## Jobs event counter
+
+The final counter is the **jobs event counter**. The role of this counter is to
+help sleepy threads detect when new work is posted in a lightweight fashion. In
+its simplest form, we would simply have a counter that gets incremented each
+time a new job is posted. This way, when a thread gets sleepy, it could read the
+counter, and then compare to see if the value has changed before it actually
+goes to sleep. But this [turns out to be too expensive] in practice, so we use a
+somewhat more complex scheme.
+
+[turns out to be too expensive]: https://github.com/rayon-rs/rayon/pull/746#issuecomment-624802747
+
+The idea is that the counter toggles between two states, depending on whether
+its value is even or odd (or, equivalently, on the value of its low bit):
+
+* Even -- If the low bit is zero, then it means that there has been no new work
+ since the last thread got sleepy.
+* Odd -- If the low bit is one, then it means that new work was posted since
+ the last thread got sleepy.
+
+### New work is posted
+
+When new work is posted, we check the value of the counter: if it is even,
+then we increment it by one, so that it becomes odd.
+
+### Worker thread gets sleepy
+
+When a worker thread gets sleepy, it will read the value of the counter. If the
+counter is odd, it will increment the counter so that it is even. Either way, it
+remembers the final value of the counter. The final value will be used later,
+when the thread is going to sleep. If at that time the counter has not changed,
+then we can assume no new jobs have been posted (though note the remote
+possibility of rollover, discussed in detail below).
+
+# Protocol for a worker thread to post work
+
+The full protocol for a thread to post work is as follows
+
+* If the work is posted into the injection queue, then execute a seq-cst fence (see below).
+* Load the counters, incrementing the JEC if it is even so that it is odd.
+* Check if there are idle threads available to handle this new job. If not,
+ and there are sleeping threads, then wake one or more threads.
+
+# Protocol for a worker thread to fall asleep
+
+The full protocol for a thread to fall asleep is as follows:
+
+* After completing all its jobs, the worker goes idle and begins to
+ search for work. As it searches, it counts "rounds". In each round,
+ it searches all other work threads' queues, plus the 'injector queue' for
+ work injected from the outside. If work is found in this search, the thread
+ becomes active again and hence restarts this protocol from the top.
+* After a certain number of rounds, the thread "gets sleepy" and executes `get_sleepy`
+ above, remembering the `final_value` of the JEC. It does one more search for work.
+* If no work is found, the thread atomically:
+ * Checks the JEC to see that it has not changed from `final_value`.
+ * If it has, then the thread goes back to searchin for work. We reset to
+ just before we got sleepy, so that we will do one more search
+ before attending to sleep again (rather than searching for many rounds).
+ * Increments the number of sleeping threads by 1.
+* The thread then executes a seq-cst fence operation (see below).
+* The thread then does one final check for injected jobs (see below). If any
+ are available, it returns to the 'pre-sleepy' state as if the JEC had changed.
+* The thread waits to be signaled. Once signaled, it returns to the idle state.
+
+# The jobs event counter and deadlock
+
+As described in the section on the JEC, the main concern around going to sleep
+is avoiding a race condition wherein:
+
+* Thread A looks for work, finds none.
+* Thread B posts work but sees no sleeping threads.
+* Thread A goes to sleep.
+
+The JEC protocol largely prevents this, but due to rollover, this prevention is
+not complete. It is possible -- if unlikely -- that enough activity occurs for
+Thread A to observe the same JEC value that it saw when getting sleepy. If the
+new work being published came from *inside* the thread-pool, then this race
+condition isn't too harmful. It means that we have fewer workers processing the
+work then we should, but we won't deadlock. This seems like an acceptable risk
+given that this is unlikely in practice.
+
+However, if the work was posted as an *external* job, that is a problem. In that
+case, it's possible that all of our workers could go to sleep, and the external
+job would never get processed. To prevent that, the sleeping protocol includes
+one final check to see if the injector queue is empty before fully falling
+asleep. Note that this final check occurs **after** the number of sleeping
+threads has been incremented. We are not concerned therefore with races against
+injections that occur after that increment, only before.
+
+Unfortunately, there is one rather subtle point concerning this final check:
+we wish to avoid the possibility that:
+
+* work is pushed into the injection queue by an outside thread X,
+* the sleepy thread S sees the JEC but it has rolled over and is equal
+* the sleepy thread S reads the injection queue but does not see the work posted by X.
+
+This is possible because the C++ memory model typically offers guarantees of the
+form "if you see the access A, then you must see those other accesses" -- but it
+doesn't guarantee that you will see the access A (i.e., if you think of
+processors with independent caches, you may be operating on very out of date
+cache state).
+
+## Using seq-cst fences to prevent deadlock
+
+To overcome this problem, we have inserted two sequentially consistent fence
+operations into the protocols above:
+
+* One fence occurs after work is posted into the injection queue, but before the
+ counters are read (including the number of sleeping threads).
+ * Note that no fence is needed for work posted to internal queues, since it is ok
+ to overlook work in that case.
+* One fence occurs after the number of sleeping threads is incremented, but
+ before the injection queue is read.
+
+### Proof sketch
+
+What follows is a "proof sketch" that the protocol is deadlock free. We model
+two relevant bits of memory, the job injector queue J and the atomic counters C.
+
+Consider the actions of the injecting thread:
+
+* PushJob: Job is injected, which can be modeled as an atomic write to J with release semantics.
+* PushFence: A sequentially consistent fence is executed.
+* ReadSleepers: The counters C are read (they may also be incremented, but we just consider the read that comes first).
+
+Meanwhile, the sleepy thread does the following:
+
+* IncSleepers: The number of sleeping threads is incremented, which is atomic exchange to C.
+* SleepFence: A sequentially consistent fence is executed.
+* ReadJob: We look to see if the queue is empty, which is a read of J with acquire semantics.
+
+Either PushFence or SleepFence must come first:
+
+* If PushFence comes first, then PushJob must be visible to ReadJob.
+* If SleepFence comes first, then IncSleepers is visible to ReadSleepers. \ No newline at end of file
diff --git a/vendor/rayon-core/src/sleep/counters.rs b/vendor/rayon-core/src/sleep/counters.rs
new file mode 100644
index 000000000..29d3b18e2
--- /dev/null
+++ b/vendor/rayon-core/src/sleep/counters.rs
@@ -0,0 +1,275 @@
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub(super) struct AtomicCounters {
+ /// Packs together a number of counters. The counters are ordered as
+ /// follows, from least to most significant bits (here, we assuming
+ /// that [`THREADS_BITS`] is equal to 10):
+ ///
+ /// * Bits 0..10: Stores the number of **sleeping threads**
+ /// * Bits 10..20: Stores the number of **inactive threads**
+ /// * Bits 20..: Stores the **job event counter** (JEC)
+ ///
+ /// This uses 10 bits ([`THREADS_BITS`]) to encode the number of threads. Note
+ /// that the total number of bits (and hence the number of bits used for the
+ /// JEC) will depend on whether we are using a 32- or 64-bit architecture.
+ value: AtomicUsize,
+}
+
+#[derive(Copy, Clone)]
+pub(super) struct Counters {
+ word: usize,
+}
+
+/// A value read from the **Jobs Event Counter**.
+/// See the [`README.md`](README.md) for more
+/// coverage of how the jobs event counter works.
+#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
+pub(super) struct JobsEventCounter(usize);
+
+impl JobsEventCounter {
+ pub(super) const DUMMY: JobsEventCounter = JobsEventCounter(std::usize::MAX);
+
+ #[inline]
+ pub(super) fn as_usize(self) -> usize {
+ self.0
+ }
+
+ /// The JEC "is sleepy" if the last thread to increment it was in the
+ /// process of becoming sleepy. This is indicated by its value being *even*.
+ /// When new jobs are posted, they check if the JEC is sleepy, and if so
+ /// they incremented it.
+ #[inline]
+ pub(super) fn is_sleepy(self) -> bool {
+ (self.as_usize() & 1) == 0
+ }
+
+ /// The JEC "is active" if the last thread to increment it was posting new
+ /// work. This is indicated by its value being *odd*. When threads get
+ /// sleepy, they will check if the JEC is active, and increment it.
+ #[inline]
+ pub(super) fn is_active(self) -> bool {
+ !self.is_sleepy()
+ }
+}
+
+/// Number of bits used for the thread counters.
+#[cfg(target_pointer_width = "64")]
+const THREADS_BITS: usize = 16;
+
+#[cfg(target_pointer_width = "32")]
+const THREADS_BITS: usize = 8;
+
+/// Bits to shift to select the sleeping threads
+/// (used with `select_bits`).
+const SLEEPING_SHIFT: usize = 0 * THREADS_BITS;
+
+/// Bits to shift to select the inactive threads
+/// (used with `select_bits`).
+const INACTIVE_SHIFT: usize = 1 * THREADS_BITS;
+
+/// Bits to shift to select the JEC
+/// (use JOBS_BITS).
+const JEC_SHIFT: usize = 2 * THREADS_BITS;
+
+/// Max value for the thread counters.
+pub(crate) const THREADS_MAX: usize = (1 << THREADS_BITS) - 1;
+
+/// Constant that can be added to add one sleeping thread.
+const ONE_SLEEPING: usize = 1;
+
+/// Constant that can be added to add one inactive thread.
+/// An inactive thread is either idle, sleepy, or sleeping.
+const ONE_INACTIVE: usize = 1 << INACTIVE_SHIFT;
+
+/// Constant that can be added to add one to the JEC.
+const ONE_JEC: usize = 1 << JEC_SHIFT;
+
+impl AtomicCounters {
+ #[inline]
+ pub(super) fn new() -> AtomicCounters {
+ AtomicCounters {
+ value: AtomicUsize::new(0),
+ }
+ }
+
+ /// Load and return the current value of the various counters.
+ /// This value can then be given to other method which will
+ /// attempt to update the counters via compare-and-swap.
+ #[inline]
+ pub(super) fn load(&self, ordering: Ordering) -> Counters {
+ Counters::new(self.value.load(ordering))
+ }
+
+ #[inline]
+ fn try_exchange(&self, old_value: Counters, new_value: Counters, ordering: Ordering) -> bool {
+ self.value
+ .compare_exchange(old_value.word, new_value.word, ordering, Ordering::Relaxed)
+ .is_ok()
+ }
+
+ /// Adds an inactive thread. This cannot fail.
+ ///
+ /// This should be invoked when a thread enters its idle loop looking
+ /// for work. It is decremented when work is found. Note that it is
+ /// not decremented if the thread transitions from idle to sleepy or sleeping;
+ /// so the number of inactive threads is always greater-than-or-equal
+ /// to the number of sleeping threads.
+ #[inline]
+ pub(super) fn add_inactive_thread(&self) {
+ self.value.fetch_add(ONE_INACTIVE, Ordering::SeqCst);
+ }
+
+ /// Increments the jobs event counter if `increment_when`, when applied to
+ /// the current value, is true. Used to toggle the JEC from even (sleepy) to
+ /// odd (active) or vice versa. Returns the final value of the counters, for
+ /// which `increment_when` is guaranteed to return false.
+ pub(super) fn increment_jobs_event_counter_if(
+ &self,
+ increment_when: impl Fn(JobsEventCounter) -> bool,
+ ) -> Counters {
+ loop {
+ let old_value = self.load(Ordering::SeqCst);
+ if increment_when(old_value.jobs_counter()) {
+ let new_value = old_value.increment_jobs_counter();
+ if self.try_exchange(old_value, new_value, Ordering::SeqCst) {
+ return new_value;
+ }
+ } else {
+ return old_value;
+ }
+ }
+ }
+
+ /// Subtracts an inactive thread. This cannot fail. It is invoked
+ /// when a thread finds work and hence becomes active. It returns the
+ /// number of sleeping threads to wake up (if any).
+ ///
+ /// See `add_inactive_thread`.
+ #[inline]
+ pub(super) fn sub_inactive_thread(&self) -> usize {
+ let old_value = Counters::new(self.value.fetch_sub(ONE_INACTIVE, Ordering::SeqCst));
+ debug_assert!(
+ old_value.inactive_threads() > 0,
+ "sub_inactive_thread: old_value {:?} has no inactive threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() <= old_value.inactive_threads(),
+ "sub_inactive_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
+ old_value,
+ old_value.sleeping_threads(),
+ old_value.inactive_threads(),
+ );
+
+ // Current heuristic: whenever an inactive thread goes away, if
+ // there are any sleeping threads, wake 'em up.
+ let sleeping_threads = old_value.sleeping_threads();
+ std::cmp::min(sleeping_threads, 2)
+ }
+
+ /// Subtracts a sleeping thread. This cannot fail, but it is only
+ /// safe to do if you you know the number of sleeping threads is
+ /// non-zero (i.e., because you have just awoken a sleeping
+ /// thread).
+ #[inline]
+ pub(super) fn sub_sleeping_thread(&self) {
+ let old_value = Counters::new(self.value.fetch_sub(ONE_SLEEPING, Ordering::SeqCst));
+ debug_assert!(
+ old_value.sleeping_threads() > 0,
+ "sub_sleeping_thread: old_value {:?} had no sleeping threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() <= old_value.inactive_threads(),
+ "sub_sleeping_thread: old_value {:?} had {} sleeping threads and {} inactive threads",
+ old_value,
+ old_value.sleeping_threads(),
+ old_value.inactive_threads(),
+ );
+ }
+
+ #[inline]
+ pub(super) fn try_add_sleeping_thread(&self, old_value: Counters) -> bool {
+ debug_assert!(
+ old_value.inactive_threads() > 0,
+ "try_add_sleeping_thread: old_value {:?} has no inactive threads",
+ old_value,
+ );
+ debug_assert!(
+ old_value.sleeping_threads() < THREADS_MAX,
+ "try_add_sleeping_thread: old_value {:?} has too many sleeping threads",
+ old_value,
+ );
+
+ let mut new_value = old_value;
+ new_value.word += ONE_SLEEPING;
+
+ self.try_exchange(old_value, new_value, Ordering::SeqCst)
+ }
+}
+
+#[inline]
+fn select_thread(word: usize, shift: usize) -> usize {
+ ((word >> shift) as usize) & THREADS_MAX
+}
+
+#[inline]
+fn select_jec(word: usize) -> usize {
+ (word >> JEC_SHIFT) as usize
+}
+
+impl Counters {
+ #[inline]
+ fn new(word: usize) -> Counters {
+ Counters { word }
+ }
+
+ #[inline]
+ fn increment_jobs_counter(self) -> Counters {
+ // We can freely add to JEC because it occupies the most significant bits.
+ // Thus it doesn't overflow into the other counters, just wraps itself.
+ Counters {
+ word: self.word.wrapping_add(ONE_JEC),
+ }
+ }
+
+ #[inline]
+ pub(super) fn jobs_counter(self) -> JobsEventCounter {
+ JobsEventCounter(select_jec(self.word))
+ }
+
+ /// The number of threads that are not actively
+ /// executing work. They may be idle, sleepy, or asleep.
+ #[inline]
+ pub(super) fn inactive_threads(self) -> usize {
+ select_thread(self.word, INACTIVE_SHIFT)
+ }
+
+ #[inline]
+ pub(super) fn awake_but_idle_threads(self) -> usize {
+ debug_assert!(
+ self.sleeping_threads() <= self.inactive_threads(),
+ "sleeping threads: {} > raw idle threads {}",
+ self.sleeping_threads(),
+ self.inactive_threads()
+ );
+ self.inactive_threads() - self.sleeping_threads()
+ }
+
+ #[inline]
+ pub(super) fn sleeping_threads(self) -> usize {
+ select_thread(self.word, SLEEPING_SHIFT)
+ }
+}
+
+impl std::fmt::Debug for Counters {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let word = format!("{:016x}", self.word);
+ fmt.debug_struct("Counters")
+ .field("word", &word)
+ .field("jobs", &self.jobs_counter().0)
+ .field("inactive", &self.inactive_threads())
+ .field("sleeping", &self.sleeping_threads())
+ .finish()
+ }
+}
diff --git a/vendor/rayon-core/src/sleep/mod.rs b/vendor/rayon-core/src/sleep/mod.rs
new file mode 100644
index 000000000..2c4ac7c28
--- /dev/null
+++ b/vendor/rayon-core/src/sleep/mod.rs
@@ -0,0 +1,394 @@
+//! Code that decides when workers should go to sleep. See README.md
+//! for an overview.
+
+use crate::latch::CoreLatch;
+use crate::log::Event::*;
+use crate::log::Logger;
+use crossbeam_utils::CachePadded;
+use std::sync::atomic::Ordering;
+use std::sync::{Condvar, Mutex};
+use std::thread;
+use std::usize;
+
+mod counters;
+pub(crate) use self::counters::THREADS_MAX;
+use self::counters::{AtomicCounters, JobsEventCounter};
+
+/// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
+/// of workers. It has callbacks that are invoked periodically at significant events,
+/// such as when workers are looping and looking for work, when latches are set, or when
+/// jobs are published, and it either blocks threads or wakes them in response to these
+/// events. See the [`README.md`] in this module for more details.
+///
+/// [`README.md`] README.md
+pub(super) struct Sleep {
+ logger: Logger,
+
+ /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
+ /// them block.
+ worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
+
+ counters: AtomicCounters,
+}
+
+/// An instance of this struct is created when a thread becomes idle.
+/// It is consumed when the thread finds work, and passed by `&mut`
+/// reference for operations that preserve the idle state. (In other
+/// words, producing one of these structs is evidence the thread is
+/// idle.) It tracks state such as how long the thread has been idle.
+pub(super) struct IdleState {
+ /// What is worker index of the idle thread?
+ worker_index: usize,
+
+ /// How many rounds have we been circling without sleeping?
+ rounds: u32,
+
+ /// Once we become sleepy, what was the sleepy counter value?
+ /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
+ jobs_counter: JobsEventCounter,
+}
+
+/// The "sleep state" for an individual worker.
+#[derive(Default)]
+struct WorkerSleepState {
+ /// Set to true when the worker goes to sleep; set to false when
+ /// the worker is notified or when it wakes.
+ is_blocked: Mutex<bool>,
+
+ condvar: Condvar,
+}
+
+const ROUNDS_UNTIL_SLEEPY: u32 = 32;
+const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
+
+impl Sleep {
+ pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
+ assert!(n_threads <= THREADS_MAX);
+ Sleep {
+ logger,
+ worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
+ counters: AtomicCounters::new(),
+ }
+ }
+
+ #[inline]
+ pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
+ self.logger.log(|| ThreadIdle {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ self.counters.add_inactive_thread();
+
+ IdleState {
+ worker_index,
+ rounds: 0,
+ jobs_counter: JobsEventCounter::DUMMY,
+ }
+ }
+
+ #[inline]
+ pub(super) fn work_found(&self, idle_state: IdleState) {
+ self.logger.log(|| ThreadFoundWork {
+ worker: idle_state.worker_index,
+ yields: idle_state.rounds,
+ });
+
+ // If we were the last idle thread and other threads are still sleeping,
+ // then we should wake up another thread.
+ let threads_to_wake = self.counters.sub_inactive_thread();
+ self.wake_any_threads(threads_to_wake as u32);
+ }
+
+ #[inline]
+ pub(super) fn no_work_found(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
+ thread::yield_now();
+ idle_state.rounds += 1;
+ } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
+ idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
+ idle_state.rounds += 1;
+ thread::yield_now();
+ } else {
+ debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
+ self.sleep(idle_state, latch, has_injected_jobs);
+ }
+ }
+
+ #[cold]
+ fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
+ let counters = self
+ .counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_active);
+ let jobs_counter = counters.jobs_counter();
+ self.logger.log(|| ThreadSleepy {
+ worker: worker_index,
+ jobs_counter: jobs_counter.as_usize(),
+ });
+ jobs_counter
+ }
+
+ #[cold]
+ fn sleep(
+ &self,
+ idle_state: &mut IdleState,
+ latch: &CoreLatch,
+ has_injected_jobs: impl FnOnce() -> bool,
+ ) {
+ let worker_index = idle_state.worker_index;
+
+ if !latch.get_sleepy() {
+ self.logger.log(|| ThreadSleepInterruptedByLatch {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ return;
+ }
+
+ let sleep_state = &self.worker_sleep_states[worker_index];
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ debug_assert!(!*is_blocked);
+
+ // Our latch was signalled. We should wake back up fully as we
+ // wil have some stuff to do.
+ if !latch.fall_asleep() {
+ self.logger.log(|| ThreadSleepInterruptedByLatch {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ idle_state.wake_fully();
+ return;
+ }
+
+ loop {
+ let counters = self.counters.load(Ordering::SeqCst);
+
+ // Check if the JEC has changed since we got sleepy.
+ debug_assert!(idle_state.jobs_counter.is_sleepy());
+ if counters.jobs_counter() != idle_state.jobs_counter {
+ // JEC has changed, so a new job was posted, but for some reason
+ // we didn't see it. We should return to just before the SLEEPY
+ // state so we can do another search and (if we fail to find
+ // work) go back to sleep.
+ self.logger.log(|| ThreadSleepInterruptedByJob {
+ worker: worker_index,
+ });
+
+ idle_state.wake_partly();
+ latch.wake_up();
+ return;
+ }
+
+ // Otherwise, let's move from IDLE to SLEEPING.
+ if self.counters.try_add_sleeping_thread(counters) {
+ break;
+ }
+ }
+
+ // Successfully registered as asleep.
+
+ self.logger.log(|| ThreadSleeping {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+
+ // We have one last check for injected jobs to do. This protects against
+ // deadlock in the very unlikely event that
+ //
+ // - an external job is being injected while we are sleepy
+ // - that job triggers the rollover over the JEC such that we don't see it
+ // - we are the last active worker thread
+ std::sync::atomic::fence(Ordering::SeqCst);
+ if has_injected_jobs() {
+ // If we see an externally injected job, then we have to 'wake
+ // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
+ // the one that wakes us.)
+ self.counters.sub_sleeping_thread();
+ } else {
+ // If we don't see an injected job (the normal case), then flag
+ // ourselves as asleep and wait till we are notified.
+ //
+ // (Note that `is_blocked` is held under a mutex and the mutex was
+ // acquired *before* we incremented the "sleepy counter". This means
+ // that whomever is coming to wake us will have to wait until we
+ // release the mutex in the call to `wait`, so they will see this
+ // boolean as true.)
+ *is_blocked = true;
+ while *is_blocked {
+ is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
+ }
+ }
+
+ // Update other state:
+ idle_state.wake_fully();
+ latch.wake_up();
+
+ self.logger.log(|| ThreadAwoken {
+ worker: worker_index,
+ latch_addr: latch.addr(),
+ });
+ }
+
+ /// Notify the given thread that it should wake up (if it is
+ /// sleeping). When this method is invoked, we typically know the
+ /// thread is asleep, though in rare cases it could have been
+ /// awoken by (e.g.) new work having been posted.
+ pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
+ self.wake_specific_thread(target_worker_index);
+ }
+
+ /// Signals that `num_jobs` new jobs were injected into the thread
+ /// pool from outside. This function will ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary.
+ ///
+ /// # Parameters
+ ///
+ /// - `source_worker_index` -- index of the thread that did the
+ /// push, or `usize::MAX` if this came from outside the thread
+ /// pool -- it is used only for logging.
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_injected_jobs(
+ &self,
+ source_worker_index: usize,
+ num_jobs: u32,
+ queue_was_empty: bool,
+ ) {
+ // This fence is needed to guarantee that threads
+ // as they are about to fall asleep, observe any
+ // new jobs that may have been injected.
+ std::sync::atomic::fence(Ordering::SeqCst);
+
+ self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ }
+
+ /// Signals that `num_jobs` new jobs were pushed onto a thread's
+ /// local deque. This function will try to ensure that there are
+ /// threads available to process them, waking threads from sleep
+ /// if necessary. However, this is not guaranteed: under certain
+ /// race conditions, the function may fail to wake any new
+ /// threads; in that case the existing thread should eventually
+ /// pop the job.
+ ///
+ /// # Parameters
+ ///
+ /// - `source_worker_index` -- index of the thread that did the
+ /// push, or `usize::MAX` if this came from outside the thread
+ /// pool -- it is used only for logging.
+ /// - `num_jobs` -- lower bound on number of jobs available for stealing.
+ /// We'll try to get at least one thread per job.
+ #[inline]
+ pub(super) fn new_internal_jobs(
+ &self,
+ source_worker_index: usize,
+ num_jobs: u32,
+ queue_was_empty: bool,
+ ) {
+ self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
+ }
+
+ /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
+ #[inline]
+ fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
+ // Read the counters and -- if sleepy workers have announced themselves
+ // -- announce that there is now work available. The final value of `counters`
+ // with which we exit the loop thus corresponds to a state when
+ let counters = self
+ .counters
+ .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
+ let num_awake_but_idle = counters.awake_but_idle_threads();
+ let num_sleepers = counters.sleeping_threads();
+
+ self.logger.log(|| JobThreadCounts {
+ worker: source_worker_index,
+ num_idle: num_awake_but_idle as u16,
+ num_sleepers: num_sleepers as u16,
+ });
+
+ if num_sleepers == 0 {
+ // nobody to wake
+ return;
+ }
+
+ // Promote from u16 to u32 so we can interoperate with
+ // num_jobs more easily.
+ let num_awake_but_idle = num_awake_but_idle as u32;
+ let num_sleepers = num_sleepers as u32;
+
+ // If the queue is non-empty, then we always wake up a worker
+ // -- clearly the existing idle jobs aren't enough. Otherwise,
+ // check to see if we have enough idle workers.
+ if !queue_was_empty {
+ let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ } else if num_awake_but_idle < num_jobs {
+ let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
+ self.wake_any_threads(num_to_wake);
+ }
+ }
+
+ #[cold]
+ fn wake_any_threads(&self, mut num_to_wake: u32) {
+ if num_to_wake > 0 {
+ for i in 0..self.worker_sleep_states.len() {
+ if self.wake_specific_thread(i) {
+ num_to_wake -= 1;
+ if num_to_wake == 0 {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ fn wake_specific_thread(&self, index: usize) -> bool {
+ let sleep_state = &self.worker_sleep_states[index];
+
+ let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
+ if *is_blocked {
+ *is_blocked = false;
+ sleep_state.condvar.notify_one();
+
+ // When the thread went to sleep, it will have incremented
+ // this value. When we wake it, its our job to decrement
+ // it. We could have the thread do it, but that would
+ // introduce a delay between when the thread was
+ // *notified* and when this counter was decremented. That
+ // might mislead people with new work into thinking that
+ // there are sleeping threads that they should try to
+ // wake, when in fact there is nothing left for them to
+ // do.
+ self.counters.sub_sleeping_thread();
+
+ self.logger.log(|| ThreadNotify { worker: index });
+
+ true
+ } else {
+ false
+ }
+ }
+}
+
+impl IdleState {
+ fn wake_fully(&mut self) {
+ self.rounds = 0;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+
+ fn wake_partly(&mut self) {
+ self.rounds = ROUNDS_UNTIL_SLEEPY;
+ self.jobs_counter = JobsEventCounter::DUMMY;
+ }
+}