diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 09:22:09 +0000 |
commit | 43a97878ce14b72f0981164f87f2e35e14151312 (patch) | |
tree | 620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-threadpool/src/worker/stack.rs | |
parent | Initial commit. (diff) | |
download | firefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip |
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/worker/stack.rs')
-rw-r--r-- | third_party/rust/tokio-threadpool/src/worker/stack.rs | 260 |
1 files changed, 260 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/worker/stack.rs b/third_party/rust/tokio-threadpool/src/worker/stack.rs new file mode 100644 index 0000000000..d02c277fed --- /dev/null +++ b/third_party/rust/tokio-threadpool/src/worker/stack.rs @@ -0,0 +1,260 @@ +use config::MAX_WORKERS; +use worker; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed}; +use std::{fmt, usize}; + +/// Lock-free stack of sleeping workers. +/// +/// This is implemented as a Treiber stack and references to nodes are +/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by +/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This +/// bit tracks if the entry is already pushed onto the stack or not. A single +/// entry can only be stored on the stack a single time. +/// +/// By using indexes instead of pointers, that allows a much greater amount of +/// data to be used for the ABA guard (see correctness section of wikipedia +/// page). +/// +/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack +#[derive(Debug)] +pub(crate) struct Stack { + state: AtomicUsize, +} + +/// State related to the stack of sleeping workers. +/// +/// - Parked head 16 bits +/// - Sequence remaining +/// +/// The parked head value has a couple of special values: +/// +/// - EMPTY: No sleepers +/// - TERMINATED: Don't spawn more threads +#[derive(Eq, PartialEq, Clone, Copy)] +pub struct State(usize); + +/// Extracts the head of the worker stack from the scheduler state +/// +/// The 16 relates to the value of MAX_WORKERS +const STACK_MASK: usize = ((1 << 16) - 1); + +/// Used to mark the stack as empty +pub(crate) const EMPTY: usize = MAX_WORKERS; + +/// Used to mark the stack as terminated +pub(crate) const TERMINATED: usize = EMPTY + 1; + +/// How many bits the Treiber ABA guard is offset by +const ABA_GUARD_SHIFT: usize = 16; + +#[cfg(target_pointer_width = "64")] +const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1; + +#[cfg(target_pointer_width = "32")] +const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1; + +// ===== impl Stack ===== + +impl Stack { + /// Create a new `Stack` representing the empty state. + pub fn new() -> Stack { + let state = AtomicUsize::new(State::new().into()); + Stack { state } + } + + /// Push a worker onto the stack + /// + /// # Return + /// + /// Returns `Ok` on success. + /// + /// Returns `Err` if the pool has transitioned to the `TERMINATED` state. + /// When terminated, pushing new entries is no longer permitted. + pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> { + let mut state: State = self.state.load(Acquire).into(); + + debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed()); + + loop { + let mut next = state; + + let head = state.head(); + + if head == TERMINATED { + // The pool is terminated, cannot push the sleeper. + return Err(()); + } + + entries[idx].set_next_sleeper(head); + next.set_head(idx); + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if state == actual { + return Ok(()); + } + + state = actual; + } + } + + /// Pop a worker off the stack. + /// + /// If `terminate` is set and the stack is empty when this function is + /// called, the state of the stack is transitioned to "terminated". At this + /// point, no further workers can be pushed onto the stack. + /// + /// # Return + /// + /// Returns the index of the popped worker and the worker's observed state. + /// + /// `None` if the stack is empty. + pub fn pop( + &self, + entries: &[worker::Entry], + max_lifecycle: worker::Lifecycle, + terminate: bool, + ) -> Option<(usize, worker::State)> { + // Figure out the empty value + let terminal = match terminate { + true => TERMINATED, + false => EMPTY, + }; + + // If terminating, the max lifecycle *must* be `Signaled`, which is the + // highest lifecycle. By passing the greatest possible lifecycle value, + // no entries are skipped by this function. + // + // TODO: It would be better to terminate in a separate function that + // atomically takes all values and transitions to a terminated state. + debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled); + + let mut state: State = self.state.load(Acquire).into(); + + loop { + let head = state.head(); + + if head == EMPTY { + let mut next = state; + next.set_head(terminal); + + if next == state { + debug_assert!(terminal == EMPTY); + return None; + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual != state { + state = actual; + continue; + } + + return None; + } else if head == TERMINATED { + return None; + } + + debug_assert!(head < MAX_WORKERS); + + let mut next = state; + + let next_head = entries[head].next_sleeper(); + + // TERMINATED can never be set as the "next pointer" on a worker. + debug_assert!(next_head != TERMINATED); + + if next_head == EMPTY { + next.set_head(terminal); + } else { + next.set_head(next_head); + } + + let actual = self + .state + .compare_and_swap(state.into(), next.into(), AcqRel) + .into(); + + if actual == state { + // Release ordering is needed to ensure that unsetting the + // `pushed` flag happens after popping the sleeper from the + // stack. + // + // Acquire ordering is required to acquire any memory associated + // with transitioning the worker's lifecycle. + let state = entries[head].fetch_unset_pushed(AcqRel); + + if state.lifecycle() >= max_lifecycle { + // If the worker has already been notified, then it is + // warming up to do more work. In this case, try to pop + // another thread that might be in a relaxed state. + continue; + } + + return Some((head, state)); + } + + state = actual; + } + } +} + +// ===== impl State ===== + +impl State { + #[inline] + fn new() -> State { + State(EMPTY) + } + + #[inline] + fn head(&self) -> usize { + self.0 & STACK_MASK + } + + #[inline] + fn set_head(&mut self, val: usize) { + // The ABA guard protects against the ABA problem w/ Treiber stacks + let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK; + + self.0 = (aba_guard << ABA_GUARD_SHIFT) | val; + } +} + +impl From<usize> for State { + fn from(src: usize) -> Self { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> Self { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let head = self.head(); + + let mut fmt = fmt.debug_struct("stack::State"); + + if head < MAX_WORKERS { + fmt.field("head", &head); + } else if head == EMPTY { + fmt.field("head", &"EMPTY"); + } else if head == TERMINATED { + fmt.field("head", &"TERMINATED"); + } + + fmt.finish() + } +} |