summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/pool
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-threadpool/src/pool
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/pool')
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/backup.rs308
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/backup_stack.rs191
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/mod.rs475
-rw-r--r--third_party/rust/tokio-threadpool/src/pool/state.rs132
4 files changed, 1106 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup.rs b/third_party/rust/tokio-threadpool/src/pool/backup.rs
new file mode 100644
index 0000000000..e94e95d6f2
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/backup.rs
@@ -0,0 +1,308 @@
+use park::DefaultPark;
+use worker::WorkerId;
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
+use std::time::{Duration, Instant};
+
+/// State associated with a thread in the thread pool.
+///
+/// The pool manages a number of threads. Some of those threads are considered
+/// "primary" threads and process the work queue. When a task being run on a
+/// primary thread enters a blocking context, the responsibility of processing
+/// the work queue must be handed off to another thread. This is done by first
+/// checking for idle threads on the backup stack. If one is found, the worker
+/// token (`WorkerId`) is handed off to that running thread. If none are found,
+/// a new thread is spawned.
+///
+/// This state manages the exchange. A thread that is idle, not assigned to a
+/// work queue, sits around for a specified amount of time. When the worker
+/// token is handed off, it is first stored in `handoff`. The backup thread is
+/// then signaled. At this point, the backup thread wakes up from sleep and
+/// reads `handoff`. At that point, it has been promoted to a primary thread and
+/// will begin processing inbound work on the work queue.
+///
+/// The name `Backup` isn't really great for what the type does, but I have not
+/// come up with a better name... Maybe it should just be named `Thread`.
+#[derive(Debug)]
+pub(crate) struct Backup {
+ /// Worker ID that is being handed to this thread.
+ handoff: UnsafeCell<Option<WorkerId>>,
+
+ /// Thread state.
+ ///
+ /// This tracks:
+ ///
+ /// * Is queued flag
+ /// * If the pool is shutting down.
+ /// * If the thread is running
+ state: AtomicUsize,
+
+ /// Next entry in the Treiber stack.
+ next_sleeper: UnsafeCell<BackupId>,
+
+ /// Used to put the thread to sleep
+ park: DefaultPark,
+}
+
+#[derive(Debug, Eq, PartialEq, Copy, Clone)]
+pub(crate) struct BackupId(pub(crate) usize);
+
+#[derive(Debug)]
+pub(crate) enum Handoff {
+ Worker(WorkerId),
+ Idle,
+ Terminated,
+}
+
+/// Tracks thread state.
+#[derive(Clone, Copy, Eq, PartialEq)]
+struct State(usize);
+
+/// Set when the worker is pushed onto the scheduler's stack of sleeping
+/// threads.
+///
+/// This flag also serves as a "notification" bit. If another thread is
+/// attempting to hand off a worker to the backup thread, then the pushed bit
+/// will not be set when the thread tries to shutdown.
+pub const PUSHED: usize = 0b001;
+
+/// Set when the thread is running
+pub const RUNNING: usize = 0b010;
+
+/// Set when the thread pool has terminated
+pub const TERMINATED: usize = 0b100;
+
+// ===== impl Backup =====
+
+impl Backup {
+ pub fn new() -> Backup {
+ Backup {
+ handoff: UnsafeCell::new(None),
+ state: AtomicUsize::new(State::new().into()),
+ next_sleeper: UnsafeCell::new(BackupId(0)),
+ park: DefaultPark::new(),
+ }
+ }
+
+ /// Called when the thread is starting
+ pub fn start(&self, worker_id: &WorkerId) {
+ debug_assert!({
+ let state: State = self.state.load(Relaxed).into();
+
+ debug_assert!(!state.is_pushed());
+ debug_assert!(state.is_running());
+ debug_assert!(!state.is_terminated());
+
+ true
+ });
+
+ // The handoff value is equal to `worker_id`
+ debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
+
+ unsafe {
+ *self.handoff.get() = None;
+ }
+ }
+
+ pub fn is_running(&self) -> bool {
+ let state: State = self.state.load(Relaxed).into();
+ state.is_running()
+ }
+
+ /// Hands off the worker to a thread.
+ ///
+ /// Returns `true` if the thread needs to be spawned.
+ pub fn worker_handoff(&self, worker_id: WorkerId) -> bool {
+ unsafe {
+ // The backup worker should not already have been handoff a worker.
+ debug_assert!((*self.handoff.get()).is_none());
+
+ // Set the handoff
+ *self.handoff.get() = Some(worker_id);
+ }
+
+ // This *probably* can just be `Release`... memory orderings, how do
+ // they work?
+ let prev = State::worker_handoff(&self.state);
+ debug_assert!(prev.is_pushed());
+
+ if prev.is_running() {
+ // Wakeup the backup thread
+ self.park.notify();
+ false
+ } else {
+ true
+ }
+ }
+
+ /// Terminate the worker
+ pub fn signal_stop(&self) {
+ let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into();
+
+ debug_assert!(!prev.is_terminated());
+ debug_assert!(prev.is_pushed());
+
+ if prev.is_running() {
+ self.park.notify();
+ }
+ }
+
+ /// Release the worker
+ pub fn release(&self) {
+ let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into();
+
+ debug_assert!(prev.is_running());
+ }
+
+ /// Wait for a worker handoff
+ pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff {
+ let sleep_until = timeout.map(|dur| Instant::now() + dur);
+ let mut state: State = self.state.load(Acquire).into();
+
+ // Run in a loop since there can be spurious wakeups
+ loop {
+ if !state.is_pushed() {
+ if state.is_terminated() {
+ return Handoff::Terminated;
+ }
+
+ let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") };
+ return Handoff::Worker(worker_id);
+ }
+
+ match sleep_until {
+ None => {
+ self.park.park_sync(None);
+ state = self.state.load(Acquire).into();
+ }
+ Some(when) => {
+ let now = Instant::now();
+
+ if now < when {
+ self.park.park_sync(Some(when - now));
+ state = self.state.load(Acquire).into();
+ } else {
+ debug_assert!(state.is_running());
+
+ // Transition out of running
+ let mut next = state;
+ next.unset_running();
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ debug_assert!(!next.is_running());
+ return Handoff::Idle;
+ }
+
+ state = actual;
+ }
+ }
+ }
+ }
+ }
+
+ pub fn is_pushed(&self) -> bool {
+ let state: State = self.state.load(Relaxed).into();
+ state.is_pushed()
+ }
+
+ pub fn set_pushed(&self, ordering: Ordering) {
+ let prev: State = self.state.fetch_or(PUSHED, ordering).into();
+ debug_assert!(!prev.is_pushed());
+ }
+
+ #[inline]
+ pub fn next_sleeper(&self) -> BackupId {
+ unsafe { *self.next_sleeper.get() }
+ }
+
+ #[inline]
+ pub fn set_next_sleeper(&self, val: BackupId) {
+ unsafe {
+ *self.next_sleeper.get() = val;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ /// Returns a new, default, thread `State`
+ pub fn new() -> State {
+ State(0)
+ }
+
+ /// Returns true if the thread entry is pushed in the sleeper stack
+ pub fn is_pushed(&self) -> bool {
+ self.0 & PUSHED == PUSHED
+ }
+
+ fn unset_pushed(&mut self) {
+ self.0 &= !PUSHED;
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.0 & RUNNING == RUNNING
+ }
+
+ pub fn set_running(&mut self) {
+ self.0 |= RUNNING;
+ }
+
+ pub fn unset_running(&mut self) {
+ self.0 &= !RUNNING;
+ }
+
+ pub fn is_terminated(&self) -> bool {
+ self.0 & TERMINATED == TERMINATED
+ }
+
+ fn worker_handoff(state: &AtomicUsize) -> State {
+ let mut curr: State = state.load(Acquire).into();
+
+ loop {
+ let mut next = curr;
+ next.set_running();
+ next.unset_pushed();
+
+ let actual = state
+ .compare_and_swap(curr.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == curr {
+ return curr;
+ }
+
+ curr = actual;
+ }
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("backup::State")
+ .field("is_pushed", &self.is_pushed())
+ .field("is_running", &self.is_running())
+ .field("is_terminated", &self.is_terminated())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
new file mode 100644
index 0000000000..b9a46d08ef
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/backup_stack.rs
@@ -0,0 +1,191 @@
+use pool::{Backup, BackupId};
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+
+#[derive(Debug)]
+pub(crate) struct BackupStack {
+ state: AtomicUsize,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+struct State(usize);
+
+pub(crate) const MAX_BACKUP: usize = 1 << 15;
+
+/// Extracts the head of the backup stack from the state
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: BackupId = BackupId(MAX_BACKUP);
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: BackupId = BackupId(EMPTY.0 + 1);
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl BackupStack =====
+
+impl BackupStack {
+ pub fn new() -> BackupStack {
+ let state = AtomicUsize::new(State::new().into());
+ BackupStack { state }
+ }
+
+ /// Push a backup thread onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[Backup], id: BackupId) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ entries[id.0].set_pushed(AcqRel);
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[id.0].set_next_sleeper(head);
+ next.set_head(id);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a backup thread off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further entries can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// * Returns the index of the popped worker and the worker's observed
+ /// state.
+ ///
+ /// * `Ok(None)` if the stack is empty.
+ /// * `Err(_)` is returned if the pool has been shutdown.
+ pub fn pop(&self, entries: &[Backup], terminate: bool) -> Result<Option<BackupId>, ()> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return Ok(None);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return Ok(None);
+ } else if head == TERMINATED {
+ return Err(());
+ }
+
+ debug_assert!(head.0 < MAX_BACKUP);
+
+ let mut next = state;
+
+ let next_head = entries[head.0].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ debug_assert!(entries[head.0].is_pushed());
+ return Ok(Some(head));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ fn new() -> State {
+ State(EMPTY.0)
+ }
+
+ fn head(&self) -> BackupId {
+ BackupId(self.0 & STACK_MASK)
+ }
+
+ fn set_head(&mut self, val: BackupId) {
+ let val = val.0;
+
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/mod.rs b/third_party/rust/tokio-threadpool/src/pool/mod.rs
new file mode 100644
index 0000000000..0a42359b3c
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/mod.rs
@@ -0,0 +1,475 @@
+mod backup;
+mod backup_stack;
+mod state;
+
+pub(crate) use self::backup::{Backup, BackupId};
+pub(crate) use self::backup_stack::MAX_BACKUP;
+pub(crate) use self::state::{Lifecycle, State, MAX_FUTURES};
+
+use self::backup::Handoff;
+use self::backup_stack::BackupStack;
+
+use config::Config;
+use shutdown::ShutdownTrigger;
+use task::{Blocking, Task};
+use worker::{self, Worker, WorkerId};
+
+use futures::Poll;
+
+use std::cell::Cell;
+use std::collections::hash_map::RandomState;
+use std::hash::{BuildHasher, Hash, Hasher};
+use std::num::Wrapping;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::{Arc, Weak};
+use std::thread;
+
+use crossbeam_deque::Injector;
+use crossbeam_utils::CachePadded;
+
+#[derive(Debug)]
+pub(crate) struct Pool {
+ // Tracks the state of the thread pool (running, shutting down, ...).
+ //
+ // While workers check this field as a hint to detect shutdown, it is
+ // **not** used as a primary point of coordination for workers. The sleep
+ // stack is used as the primary point of coordination for workers.
+ //
+ // The value of this atomic is deserialized into a `pool::State` instance.
+ // See comments for that type.
+ pub state: CachePadded<AtomicUsize>,
+
+ // Stack tracking sleeping workers.
+ sleep_stack: CachePadded<worker::Stack>,
+
+ // Worker state
+ //
+ // A worker is a thread that is processing the work queue and polling
+ // futures.
+ //
+ // The number of workers will *usually* be small.
+ pub workers: Arc<[worker::Entry]>,
+
+ // The global MPMC queue of tasks.
+ //
+ // Spawned tasks are pushed into this queue. Although worker threads have their own dedicated
+ // task queues, they periodically steal tasks from this global queue, too.
+ pub queue: Arc<Injector<Arc<Task>>>,
+
+ // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
+ //
+ // When spawning a new `Worker`, this weak reference is upgraded and handed out to the new
+ // thread.
+ pub trigger: Weak<ShutdownTrigger>,
+
+ // Backup thread state
+ //
+ // In order to efficiently support `blocking`, a pool of backup threads is
+ // needed. These backup threads are ready to take over a worker if the
+ // future being processed requires blocking.
+ backup: Box<[Backup]>,
+
+ // Stack of sleeping backup threads
+ pub backup_stack: BackupStack,
+
+ // State regarding coordinating blocking sections and tracking tasks that
+ // are pending blocking capacity.
+ blocking: Blocking,
+
+ // Configuration
+ pub config: Config,
+}
+
+impl Pool {
+ /// Create a new `Pool`
+ pub fn new(
+ workers: Arc<[worker::Entry]>,
+ trigger: Weak<ShutdownTrigger>,
+ max_blocking: usize,
+ config: Config,
+ queue: Arc<Injector<Arc<Task>>>,
+ ) -> Pool {
+ let pool_size = workers.len();
+ let total_size = max_blocking + pool_size;
+
+ // Create the set of backup entries
+ //
+ // This is `backup + pool_size` because the core thread pool running the
+ // workers is spawned from backup as well.
+ let backup = (0..total_size)
+ .map(|_| Backup::new())
+ .collect::<Vec<_>>()
+ .into_boxed_slice();
+
+ let backup_stack = BackupStack::new();
+
+ for i in (0..backup.len()).rev() {
+ backup_stack.push(&backup, BackupId(i)).unwrap();
+ }
+
+ // Initialize the blocking state
+ let blocking = Blocking::new(max_blocking);
+
+ let ret = Pool {
+ state: CachePadded::new(AtomicUsize::new(State::new().into())),
+ sleep_stack: CachePadded::new(worker::Stack::new()),
+ workers,
+ queue,
+ trigger,
+ backup,
+ backup_stack,
+ blocking,
+ config,
+ };
+
+ // Now, we prime the sleeper stack
+ for i in 0..pool_size {
+ ret.sleep_stack.push(&ret.workers, i).unwrap();
+ }
+
+ ret
+ }
+
+ /// Start shutting down the pool. This means that no new futures will be
+ /// accepted.
+ pub fn shutdown(&self, now: bool, purge_queue: bool) {
+ let mut state: State = self.state.load(Acquire).into();
+
+ trace!("shutdown; state={:?}", state);
+
+ // For now, this must be true
+ debug_assert!(!purge_queue || now);
+
+ // Start by setting the shutdown flag
+ loop {
+ let mut next = state;
+
+ let num_futures = next.num_futures();
+
+ if next.lifecycle() == Lifecycle::ShutdownNow {
+ // Already transitioned to shutting down state
+
+ if !purge_queue || num_futures == 0 {
+ // Nothing more to do
+ return;
+ }
+
+ // The queue must be purged
+ debug_assert!(purge_queue);
+ next.clear_num_futures();
+ } else {
+ next.set_lifecycle(if now || num_futures == 0 {
+ // If already idle, always transition to shutdown now.
+ Lifecycle::ShutdownNow
+ } else {
+ Lifecycle::ShutdownOnIdle
+ });
+
+ if purge_queue {
+ next.clear_num_futures();
+ }
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ state = next;
+ break;
+ }
+
+ state = actual;
+ }
+
+ trace!(" -> transitioned to shutdown");
+
+ // Only transition to terminate if there are no futures currently on the
+ // pool
+ if state.num_futures() != 0 {
+ return;
+ }
+
+ self.terminate_sleeping_workers();
+ }
+
+ /// Called by `Worker` as it tries to enter a sleeping state. Before it
+ /// sleeps, it must push itself onto the sleep stack. This enables other
+ /// threads to see it when signaling work.
+ pub fn push_sleeper(&self, idx: usize) -> Result<(), ()> {
+ self.sleep_stack.push(&self.workers, idx)
+ }
+
+ pub fn terminate_sleeping_workers(&self) {
+ use worker::Lifecycle::Signaled;
+
+ trace!(" -> shutting down workers");
+ // Wakeup all sleeping workers. They will wake up, see the state
+ // transition, and terminate.
+ while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
+ self.workers[idx].signal_stop(worker_state);
+ }
+
+ // Now terminate any backup threads
+ //
+ // The call to `pop` must be successful because shutting down the pool
+ // is coordinated and at this point, this is the only thread that will
+ // attempt to transition the backup stack to "terminated".
+ while let Ok(Some(backup_id)) = self.backup_stack.pop(&self.backup, true) {
+ self.backup[backup_id.0].signal_stop();
+ }
+ }
+
+ pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
+ self.blocking.poll_blocking_capacity(task)
+ }
+
+ /// Submit a task to the scheduler.
+ ///
+ /// Called from either inside or outside of the scheduler. If currently on
+ /// the scheduler, then a fast path is taken.
+ pub fn submit(&self, task: Arc<Task>, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ Worker::with_current(|worker| {
+ if let Some(worker) = worker {
+ // If the worker is in blocking mode, then even though the
+ // thread-local variable is set, the current thread does not
+ // have ownership of that worker entry. This is because the
+ // worker entry has already been handed off to another thread.
+ //
+ // The second check handles the case where the current thread is
+ // part of a different threadpool than the one being submitted
+ // to.
+ if !worker.is_blocking() && *self == *worker.pool {
+ let idx = worker.id.0;
+
+ trace!(" -> submit internal; idx={}", idx);
+
+ worker.pool.workers[idx].submit_internal(task);
+ worker.pool.signal_work(pool);
+ return;
+ }
+ }
+
+ self.submit_external(task, pool);
+ });
+ }
+
+ /// Submit a task to the scheduler from off worker
+ ///
+ /// Called from outside of the scheduler, this function is how new tasks
+ /// enter the system.
+ pub fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ trace!(" -> submit external");
+
+ self.queue.push(task);
+ self.signal_work(pool);
+ }
+
+ pub fn release_backup(&self, backup_id: BackupId) -> Result<(), ()> {
+ // First update the state, this cannot fail because the caller must have
+ // exclusive access to the backup token.
+ self.backup[backup_id.0].release();
+
+ // Push the backup entry back on the stack
+ self.backup_stack.push(&self.backup, backup_id)
+ }
+
+ pub fn notify_blocking_task(&self, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+ self.blocking.notify_task(&pool);
+ }
+
+ /// Provision a thread to run a worker
+ pub fn spawn_thread(&self, id: WorkerId, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ let backup_id = match self.backup_stack.pop(&self.backup, false) {
+ Ok(Some(backup_id)) => backup_id,
+ Ok(None) => panic!("no thread available"),
+ Err(_) => {
+ debug!("failed to spawn worker thread due to the thread pool shutting down");
+ return;
+ }
+ };
+
+ let need_spawn = self.backup[backup_id.0].worker_handoff(id.clone());
+
+ if !need_spawn {
+ return;
+ }
+
+ let trigger = match self.trigger.upgrade() {
+ None => {
+ // The pool is shutting down.
+ return;
+ }
+ Some(t) => t,
+ };
+
+ let mut th = thread::Builder::new();
+
+ if let Some(ref prefix) = pool.config.name_prefix {
+ th = th.name(format!("{}{}", prefix, backup_id.0));
+ }
+
+ if let Some(stack) = pool.config.stack_size {
+ th = th.stack_size(stack);
+ }
+
+ let pool = pool.clone();
+
+ let res = th.spawn(move || {
+ if let Some(ref f) = pool.config.after_start {
+ f();
+ }
+
+ let mut worker_id = id;
+
+ pool.backup[backup_id.0].start(&worker_id);
+
+ loop {
+ // The backup token should be in the running state.
+ debug_assert!(pool.backup[backup_id.0].is_running());
+
+ // TODO: Avoid always cloning
+ let worker = Worker::new(worker_id, backup_id, pool.clone(), trigger.clone());
+
+ // Run the worker. If the worker transitioned to a "blocking"
+ // state, then `is_blocking` will be true.
+ if !worker.do_run() {
+ // The worker shutdown, so exit the thread.
+ break;
+ }
+
+ debug_assert!(!pool.backup[backup_id.0].is_pushed());
+
+ // Push the thread back onto the backup stack. This makes it
+ // available for future handoffs.
+ //
+ // This **must** happen before notifying the task.
+ let res = pool.backup_stack.push(&pool.backup, backup_id);
+
+ if res.is_err() {
+ // The pool is being shutdown.
+ break;
+ }
+
+ // The task switched the current thread to blocking mode.
+ // Now that the blocking task completed, any tasks
+ pool.notify_blocking_task(&pool);
+
+ debug_assert!(pool.backup[backup_id.0].is_running());
+
+ // Wait for a handoff
+ let handoff = pool.backup[backup_id.0].wait_for_handoff(pool.config.keep_alive);
+
+ match handoff {
+ Handoff::Worker(id) => {
+ debug_assert!(pool.backup[backup_id.0].is_running());
+ worker_id = id;
+ }
+ Handoff::Idle | Handoff::Terminated => {
+ break;
+ }
+ }
+ }
+
+ if let Some(ref f) = pool.config.before_stop {
+ f();
+ }
+ });
+
+ if let Err(e) = res {
+ error!("failed to spawn worker thread; err={:?}", e);
+ panic!("failed to spawn worker thread: {:?}", e);
+ }
+ }
+
+ /// If there are any other workers currently relaxing, signal them that work
+ /// is available so that they can try to find more work to process.
+ pub fn signal_work(&self, pool: &Arc<Pool>) {
+ debug_assert_eq!(*self, **pool);
+
+ use worker::Lifecycle::Signaled;
+
+ if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
+ let entry = &self.workers[idx];
+
+ debug_assert!(
+ worker_state.lifecycle() != Signaled,
+ "actual={:?}",
+ worker_state.lifecycle(),
+ );
+
+ trace!("signal_work -- notify; idx={}", idx);
+
+ if !entry.notify(worker_state) {
+ trace!("signal_work -- spawn; idx={}", idx);
+ self.spawn_thread(WorkerId(idx), pool);
+ }
+ }
+ }
+
+ /// Generates a random number
+ ///
+ /// Uses a thread-local random number generator based on XorShift.
+ pub fn rand_usize(&self) -> usize {
+ thread_local! {
+ static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(prng_seed()));
+ }
+
+ RNG.with(|rng| {
+ // This is the 32-bit variant of Xorshift.
+ // https://en.wikipedia.org/wiki/Xorshift
+ let mut x = rng.get();
+ x ^= x << 13;
+ x ^= x >> 17;
+ x ^= x << 5;
+ rng.set(x);
+ x.0 as usize
+ })
+ }
+}
+
+impl PartialEq for Pool {
+ fn eq(&self, other: &Pool) -> bool {
+ self as *const _ == other as *const _
+ }
+}
+
+unsafe impl Send for Pool {}
+unsafe impl Sync for Pool {}
+
+// Return a thread-specific, 32-bit, non-zero seed value suitable for a 32-bit
+// PRNG. This uses one libstd RandomState for a default hasher and hashes on
+// the current thread ID to obtain an unpredictable, collision resistant seed.
+fn prng_seed() -> u32 {
+ // This obtains a small number of random bytes from the host system (for
+ // example, on unix via getrandom(2)) in order to seed an unpredictable and
+ // HashDoS resistant 64-bit hash function (currently: `SipHasher13` with
+ // 128-bit state). We only need one of these, to make the seeds for all
+ // process threads different via hashed IDs, collision resistant, and
+ // unpredictable.
+ lazy_static! {
+ static ref RND_STATE: RandomState = RandomState::new();
+ }
+
+ // Hash the current thread ID to produce a u32 value
+ let mut hasher = RND_STATE.build_hasher();
+ thread::current().id().hash(&mut hasher);
+ let hash: u64 = hasher.finish();
+ let seed = (hash as u32) ^ ((hash >> 32) as u32);
+
+ // Ensure non-zero seed (Xorshift yields only zero's for that seed)
+ if seed == 0 {
+ 0x9b4e_6d25 // misc bits, could be any non-zero
+ } else {
+ seed
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/pool/state.rs b/third_party/rust/tokio-threadpool/src/pool/state.rs
new file mode 100644
index 0000000000..5ecb514e5c
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/pool/state.rs
@@ -0,0 +1,132 @@
+use std::{fmt, usize};
+
+/// ThreadPool state.
+///
+/// The two least significant bits are the shutdown flags. (0 for active, 1 for
+/// shutdown on idle, 2 for shutting down). The remaining bits represent the
+/// number of futures that still need to complete.
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub(crate) struct State(usize);
+
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
+#[repr(usize)]
+pub(crate) enum Lifecycle {
+ /// The thread pool is currently running
+ Running = 0,
+
+ /// The thread pool should shutdown once it reaches an idle state.
+ ShutdownOnIdle = 1,
+
+ /// The thread pool should start the process of shutting down.
+ ShutdownNow = 2,
+}
+
+/// Mask used to extract the number of futures from the state
+const LIFECYCLE_MASK: usize = 0b11;
+const NUM_FUTURES_MASK: usize = !LIFECYCLE_MASK;
+const NUM_FUTURES_OFFSET: usize = 2;
+
+/// Max number of futures the pool can handle.
+pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET;
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ pub fn new() -> State {
+ State(0)
+ }
+
+ /// Returns the number of futures still pending completion.
+ pub fn num_futures(&self) -> usize {
+ self.0 >> NUM_FUTURES_OFFSET
+ }
+
+ /// Increment the number of futures pending completion.
+ ///
+ /// Returns false on failure.
+ pub fn inc_num_futures(&mut self) {
+ debug_assert!(self.num_futures() < MAX_FUTURES);
+ debug_assert!(self.lifecycle() < Lifecycle::ShutdownNow);
+
+ self.0 += 1 << NUM_FUTURES_OFFSET;
+ }
+
+ /// Decrement the number of futures pending completion.
+ pub fn dec_num_futures(&mut self) {
+ let num_futures = self.num_futures();
+
+ if num_futures == 0 {
+ // Already zero
+ return;
+ }
+
+ self.0 -= 1 << NUM_FUTURES_OFFSET;
+
+ if self.lifecycle() == Lifecycle::ShutdownOnIdle && num_futures == 1 {
+ self.set_lifecycle(Lifecycle::ShutdownNow);
+ }
+ }
+
+ /// Set the number of futures pending completion to zero
+ pub fn clear_num_futures(&mut self) {
+ self.0 = self.0 & LIFECYCLE_MASK;
+ }
+
+ pub fn lifecycle(&self) -> Lifecycle {
+ (self.0 & LIFECYCLE_MASK).into()
+ }
+
+ pub fn set_lifecycle(&mut self, val: Lifecycle) {
+ self.0 = (self.0 & NUM_FUTURES_MASK) | (val as usize);
+ }
+
+ pub fn is_terminated(&self) -> bool {
+ self.lifecycle() == Lifecycle::ShutdownNow && self.num_futures() == 0
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("pool::State")
+ .field("lifecycle", &self.lifecycle())
+ .field("num_futures", &self.num_futures())
+ .finish()
+ }
+}
+
+// ===== impl Lifecycle =====
+
+impl From<usize> for Lifecycle {
+ fn from(src: usize) -> Lifecycle {
+ use self::Lifecycle::*;
+
+ debug_assert!(
+ src == Running as usize
+ || src == ShutdownOnIdle as usize
+ || src == ShutdownNow as usize
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<Lifecycle> for usize {
+ fn from(src: Lifecycle) -> usize {
+ let v = src as usize;
+ debug_assert!(v & LIFECYCLE_MASK == v);
+ v
+ }
+}