summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/pool/backup_stack.rs')
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/backup_stack.rs191
1 files changed, 191 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
new file mode 100644
index 0000000000..b9a46d08ef
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
@@ -0,0 +1,191 @@
+use pool::{Backup, BackupId};
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+#[derive(Debug)]
+pub(crate) struct BackupStack {
+ state: AtomicUsize,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+struct State(usize);
+
+pub(crate) const MAX_BACKUP: usize = 1 << 15;
+
+/// Extracts the head of the backup stack from the state
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP);
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1);
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl BackupStack =====
+
+impl BackupStack {
+ pub fn new() -> BackupStack {
+ let state = AtomicUsize::new(State::new().into());
+ BackupStack { state }
+ }
+
+ /// Push a backup thread onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ entries[id.0].set_pushed(AcqRel);
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[id.0].set_next_sleeper(head);
+ next.set_head(id);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a backup thread off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further entries can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// * Returns the index of the popped worker and the worker's observed
+ /// state.
+ ///
+ /// * `Ok(None)` if the stack is empty.
+ /// * `Err(_)` is returned if the pool has been shutdown.
+ pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return Ok(None);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return Ok(None);
+ } else if head == TERMINATED {
+ return Err(());
+ }
+
+ debug_assert!(head.0 < MAX_BACKUP);
+
+ let mut next = state;
+
+ let next_head = entries[head.0].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ debug_assert!(entries[head.0].is_pushed());
+ return Ok(Some(head));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ fn new() -> State {
+ State(EMPTY.0)
+ }
+
+ fn head(&self) -> BackupId {
+ BackupId(self.0 & STACK_MASK)
+ }
+
+ fn set_head(&mut self, val: BackupId) {
+ let val = val.0;
+
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}