summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/worker/stack.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/worker/stack.rs')
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/stack.rs260
1 files changed, 260 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/worker/stack.rs b/third_party/rust/tokio-threadpool/src/worker/stack.rs
new file mode 100644
index 0000000000..d02c277fed
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/stack.rs
@@ -0,0 +1,260 @@
+use config::MAX_WORKERS;
+use worker;
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
+use std::{fmt, usize};
+
+/// Lock-free stack of sleeping workers.
+///
+/// This is implemented as a Treiber stack and references to nodes are
+/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by
+/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This
+/// bit tracks if the entry is already pushed onto the stack or not. A single
+/// entry can only be stored on the stack a single time.
+///
+/// By using indexes instead of pointers, that allows a much greater amount of
+/// data to be used for the ABA guard (see correctness section of wikipedia
+/// page).
+///
+/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack
+#[derive(Debug)]
+pub(crate) struct Stack {
+ state: AtomicUsize,
+}
+
+/// State related to the stack of sleeping workers.
+///
+/// - Parked head 16 bits
+/// - Sequence remaining
+///
+/// The parked head value has a couple of special values:
+///
+/// - EMPTY: No sleepers
+/// - TERMINATED: Don't spawn more threads
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub struct State(usize);
+
+/// Extracts the head of the worker stack from the scheduler state
+///
+/// The 16 relates to the value of MAX_WORKERS
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: usize = MAX_WORKERS;
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: usize = EMPTY + 1;
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl Stack =====
+
+impl Stack {
+ /// Create a new `Stack` representing the empty state.
+ pub fn new() -> Stack {
+ let state = AtomicUsize::new(State::new().into());
+ Stack { state }
+ }
+
+ /// Push a worker onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed());
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[idx].set_next_sleeper(head);
+ next.set_head(idx);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a worker off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further workers can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// Returns the index of the popped worker and the worker's observed state.
+ ///
+ /// `None` if the stack is empty.
+ pub fn pop(
+ &self,
+ entries: &[worker::Entry],
+ max_lifecycle: worker::Lifecycle,
+ terminate: bool,
+ ) -> Option<(usize, worker::State)> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ // If terminating, the max lifecycle *must* be `Signaled`, which is the
+ // highest lifecycle. By passing the greatest possible lifecycle value,
+ // no entries are skipped by this function.
+ //
+ // TODO: It would be better to terminate in a separate function that
+ // atomically takes all values and transitions to a terminated state.
+ debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled);
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return None;
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return None;
+ } else if head == TERMINATED {
+ return None;
+ }
+
+ debug_assert!(head < MAX_WORKERS);
+
+ let mut next = state;
+
+ let next_head = entries[head].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // Release ordering is needed to ensure that unsetting the
+ // `pushed` flag happens after popping the sleeper from the
+ // stack.
+ //
+ // Acquire ordering is required to acquire any memory associated
+ // with transitioning the worker's lifecycle.
+ let state = entries[head].fetch_unset_pushed(AcqRel);
+
+ if state.lifecycle() >= max_lifecycle {
+ // If the worker has already been notified, then it is
+ // warming up to do more work. In this case, try to pop
+ // another thread that might be in a relaxed state.
+ continue;
+ }
+
+ return Some((head, state));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ fn new() -> State {
+ State(EMPTY)
+ }
+
+ #[inline]
+ fn head(&self) -> usize {
+ self.0 & STACK_MASK
+ }
+
+ #[inline]
+ fn set_head(&mut self, val: usize) {
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let head = self.head();
+
+ let mut fmt = fmt.debug_struct("stack::State");
+
+ if head < MAX_WORKERS {
+ fmt.field("head", &head);
+ } else if head == EMPTY {
+ fmt.field("head", &"EMPTY");
+ } else if head == TERMINATED {
+ fmt.field("head", &"TERMINATED");
+ }
+
+ fmt.finish()
+ }
+}