summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio-threadpool/src/worker
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 09:22:09 +0000
commit43a97878ce14b72f0981164f87f2e35e14151312 (patch)
tree620249daf56c0258faa40cbdcf9cfba06de2a846 /third_party/rust/tokio-threadpool/src/worker
parentInitial commit. (diff)
downloadfirefox-43a97878ce14b72f0981164f87f2e35e14151312.tar.xz
firefox-43a97878ce14b72f0981164f87f2e35e14151312.zip
Adding upstream version 110.0.1.upstream/110.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio-threadpool/src/worker')
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/entry.rs330
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/mod.rs797
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/stack.rs260
-rw-r--r--third_party/rust/tokio-threadpool/src/worker/state.rs153
4 files changed, 1540 insertions, 0 deletions
diff --git a/third_party/rust/tokio-threadpool/src/worker/entry.rs b/third_party/rust/tokio-threadpool/src/worker/entry.rs
new file mode 100644
index 0000000000..0dcf5108b8
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/entry.rs
@@ -0,0 +1,330 @@
+use park::{BoxPark, BoxUnpark};
+use task::Task;
+use worker::state::{State, PUSHED_MASK};
+
+use std::cell::UnsafeCell;
+use std::fmt;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
+
+use crossbeam_deque::{Steal, Stealer, Worker};
+use crossbeam_queue::SegQueue;
+use crossbeam_utils::CachePadded;
+use slab::Slab;
+
+// TODO: None of the fields should be public
+//
+// It would also be helpful to split up the state across what fields /
+// operations are thread-safe vs. which ones require ownership of the worker.
+pub(crate) struct WorkerEntry {
+ // Worker state. This is mutated when notifying the worker.
+ //
+ // The `usize` value is deserialized to a `worker::State` instance. See
+ // comments on that type.
+ pub state: CachePadded<AtomicUsize>,
+
+ // Next entry in the parked Trieber stack
+ next_sleeper: UnsafeCell<usize>,
+
+ // Worker half of deque
+ pub worker: Worker<Arc<Task>>,
+
+ // Stealer half of deque
+ stealer: Stealer<Arc<Task>>,
+
+ // Thread parker
+ park: UnsafeCell<Option<BoxPark>>,
+
+ // Thread unparker
+ unpark: UnsafeCell<Option<BoxUnpark>>,
+
+ // Tasks that have been first polled by this worker, but not completed yet.
+ running_tasks: UnsafeCell<Slab<Arc<Task>>>,
+
+ // Tasks that have been first polled by this worker, but completed by another worker.
+ remotely_completed_tasks: SegQueue<Arc<Task>>,
+
+ // Set to `true` when `remotely_completed_tasks` has tasks that need to be removed from
+ // `running_tasks`.
+ needs_drain: AtomicBool,
+}
+
+impl WorkerEntry {
+ pub fn new(park: BoxPark, unpark: BoxUnpark) -> Self {
+ let w = Worker::new_fifo();
+ let s = w.stealer();
+
+ WorkerEntry {
+ state: CachePadded::new(AtomicUsize::new(State::default().into())),
+ next_sleeper: UnsafeCell::new(0),
+ worker: w,
+ stealer: s,
+ park: UnsafeCell::new(Some(park)),
+ unpark: UnsafeCell::new(Some(unpark)),
+ running_tasks: UnsafeCell::new(Slab::new()),
+ remotely_completed_tasks: SegQueue::new(),
+ needs_drain: AtomicBool::new(false),
+ }
+ }
+
+ /// Atomically unset the pushed flag.
+ ///
+ /// # Return
+ ///
+ /// The state *before* the push flag is unset.
+ ///
+ /// # Ordering
+ ///
+ /// The specified ordering is established on the entry's state variable.
+ pub fn fetch_unset_pushed(&self, ordering: Ordering) -> State {
+ self.state.fetch_and(!PUSHED_MASK, ordering).into()
+ }
+
+ /// Submit a task to this worker while currently on the same thread that is
+ /// running the worker.
+ #[inline]
+ pub fn submit_internal(&self, task: Arc<Task>) {
+ self.push_internal(task);
+ }
+
+ /// Notifies the worker and returns `false` if it needs to be spawned.
+ ///
+ /// # Ordering
+ ///
+ /// The `state` must have been obtained with an `Acquire` ordering.
+ #[inline]
+ pub fn notify(&self, mut state: State) -> bool {
+ use worker::Lifecycle::*;
+
+ loop {
+ let mut next = state;
+ next.notify();
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ break;
+ }
+
+ state = actual;
+ }
+
+ match state.lifecycle() {
+ Sleeping => {
+ // The worker is currently sleeping, the condition variable must
+ // be signaled
+ self.unpark();
+ true
+ }
+ Shutdown => false,
+ Running | Notified | Signaled => {
+ // In these states, the worker is active and will eventually see
+ // the task that was just submitted.
+ true
+ }
+ }
+ }
+
+ /// Signals to the worker that it should stop
+ ///
+ /// `state` is the last observed state for the worker. This allows skipping
+ /// the initial load from the state atomic.
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` when the worker was successfully signaled.
+ ///
+ /// Returns `Err` if the worker has already terminated.
+ pub fn signal_stop(&self, mut state: State) {
+ use worker::Lifecycle::*;
+
+ // Transition the worker state to signaled
+ loop {
+ let mut next = state;
+
+ match state.lifecycle() {
+ Shutdown => {
+ return;
+ }
+ Running | Sleeping => {}
+ Notified | Signaled => {
+ // These two states imply that the worker is active, thus it
+ // will eventually see the shutdown signal, so we don't need
+ // to do anything.
+ //
+ // The worker is forced to see the shutdown signal
+ // eventually as:
+ //
+ // a) No more work will arrive
+ // b) The shutdown signal is stored as the head of the
+ // sleep, stack which will prevent the worker from going to
+ // sleep again.
+ return;
+ }
+ }
+
+ next.set_lifecycle(Signaled);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ break;
+ }
+
+ state = actual;
+ }
+
+ // Wakeup the worker
+ self.unpark();
+ }
+
+ /// Pop a task
+ ///
+ /// This **must** only be called by the thread that owns the worker entry.
+ /// This function is not `Sync`.
+ #[inline]
+ pub fn pop_task(&self) -> Option<Arc<Task>> {
+ self.worker.pop()
+ }
+
+ /// Steal tasks
+ ///
+ /// This is called by *other* workers to steal a task for processing. This
+ /// function is `Sync`.
+ ///
+ /// At the same time, this method steals some additional tasks and moves
+ /// them into `dest` in order to balance the work distribution among
+ /// workers.
+ pub fn steal_tasks(&self, dest: &Self) -> Steal<Arc<Task>> {
+ self.stealer.steal_batch_and_pop(&dest.worker)
+ }
+
+ /// Drain (and drop) all tasks that are queued for work.
+ ///
+ /// This is called when the pool is shutting down.
+ pub fn drain_tasks(&self) {
+ while self.worker.pop().is_some() {}
+ }
+
+ /// Parks the worker thread.
+ pub fn park(&self) {
+ if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
+ park.park().unwrap();
+ }
+ }
+
+ /// Parks the worker thread for at most `duration`.
+ pub fn park_timeout(&self, duration: Duration) {
+ if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
+ park.park_timeout(duration).unwrap();
+ }
+ }
+
+ /// Unparks the worker thread.
+ #[inline]
+ pub fn unpark(&self) {
+ if let Some(park) = unsafe { (*self.unpark.get()).as_ref() } {
+ park.unpark();
+ }
+ }
+
+ /// Registers a task in this worker.
+ ///
+ /// Called when the task is being polled for the first time.
+ #[inline]
+ pub fn register_task(&self, task: &Arc<Task>) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+
+ let key = running_tasks.insert(task.clone());
+ task.reg_index.set(key);
+ }
+
+ /// Unregisters a task from this worker.
+ ///
+ /// Called when the task is completed and was previously registered in this worker.
+ #[inline]
+ pub fn unregister_task(&self, task: Arc<Task>) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+ running_tasks.remove(task.reg_index.get());
+ self.drain_remotely_completed_tasks();
+ }
+
+ /// Unregisters a task from this worker.
+ ///
+ /// Called when the task is completed by another worker and was previously registered in this
+ /// worker.
+ #[inline]
+ pub fn remotely_complete_task(&self, task: Arc<Task>) {
+ self.remotely_completed_tasks.push(task);
+ self.needs_drain.store(true, Release);
+ }
+
+ /// Drops the remaining incomplete tasks and the parker associated with this worker.
+ ///
+ /// This function is called by the shutdown trigger.
+ pub fn shutdown(&self) {
+ self.drain_remotely_completed_tasks();
+
+ // Abort all incomplete tasks.
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+ for (_, task) in running_tasks.iter() {
+ task.abort();
+ }
+ running_tasks.clear();
+
+ unsafe {
+ *self.park.get() = None;
+ *self.unpark.get() = None;
+ }
+ }
+
+ /// Drains the `remotely_completed_tasks` queue and removes tasks from `running_tasks`.
+ #[inline]
+ fn drain_remotely_completed_tasks(&self) {
+ if self.needs_drain.compare_and_swap(true, false, Acquire) {
+ let running_tasks = unsafe { &mut *self.running_tasks.get() };
+
+ while let Ok(task) = self.remotely_completed_tasks.pop() {
+ running_tasks.remove(task.reg_index.get());
+ }
+ }
+ }
+
+ #[inline]
+ pub fn push_internal(&self, task: Arc<Task>) {
+ self.worker.push(task);
+ }
+
+ #[inline]
+ pub fn next_sleeper(&self) -> usize {
+ unsafe { *self.next_sleeper.get() }
+ }
+
+ #[inline]
+ pub fn set_next_sleeper(&self, val: usize) {
+ unsafe {
+ *self.next_sleeper.get() = val;
+ }
+ }
+}
+
+impl fmt::Debug for WorkerEntry {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("WorkerEntry")
+ .field("state", &self.state.load(Relaxed))
+ .field("next_sleeper", &"UnsafeCell<usize>")
+ .field("worker", &self.worker)
+ .field("stealer", &self.stealer)
+ .field("park", &"UnsafeCell<BoxPark>")
+ .field("unpark", &"BoxUnpark")
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/mod.rs b/third_party/rust/tokio-threadpool/src/worker/mod.rs
new file mode 100644
index 0000000000..d380c5d561
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/mod.rs
@@ -0,0 +1,797 @@
+mod entry;
+mod stack;
+mod state;
+
+pub(crate) use self::entry::WorkerEntry as Entry;
+pub(crate) use self::stack::Stack;
+pub(crate) use self::state::{Lifecycle, State};
+
+use notifier::Notifier;
+use pool::{self, BackupId, Pool};
+use sender::Sender;
+use shutdown::ShutdownTrigger;
+use task::{self, CanBlock, Task};
+
+use tokio_executor;
+
+use futures::{Async, Poll};
+
+use std::cell::Cell;
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::atomic::Ordering::{AcqRel, Acquire};
+use std::sync::Arc;
+use std::thread;
+use std::time::Duration;
+
+/// Thread worker
+///
+/// This is passed to the [`around_worker`] callback set on [`Builder`]. This
+/// callback is only expected to call [`run`] on it.
+///
+/// [`Builder`]: struct.Builder.html
+/// [`around_worker`]: struct.Builder.html#method.around_worker
+/// [`run`]: struct.Worker.html#method.run
+#[derive(Debug)]
+pub struct Worker {
+ // Shared scheduler data
+ pub(crate) pool: Arc<Pool>,
+
+ // WorkerEntry index
+ pub(crate) id: WorkerId,
+
+ // Backup thread ID assigned to processing this worker.
+ backup_id: BackupId,
+
+ // Set to the task that is currently being polled by the worker. This is
+ // needed so that `blocking` blocks are able to interact with this task.
+ //
+ // This has to be a raw pointer to make it compile, but great care is taken
+ // when this is set.
+ current_task: CurrentTask,
+
+ // Set when the thread is in blocking mode.
+ is_blocking: Cell<bool>,
+
+ // Set when the worker should finalize on drop
+ should_finalize: Cell<bool>,
+
+ // Completes the shutdown process when the `ThreadPool` and all `Worker`s get dropped.
+ trigger: Arc<ShutdownTrigger>,
+
+ // Keep the value on the current thread.
+ _p: PhantomData<Rc<()>>,
+}
+
+/// Tracks the state related to the currently running task.
+#[derive(Debug)]
+struct CurrentTask {
+ /// This has to be a raw pointer to make it compile, but great care is taken
+ /// when this is set.
+ task: Cell<Option<*const Arc<Task>>>,
+
+ /// Tracks the blocking capacity allocation state.
+ can_block: Cell<CanBlock>,
+}
+
+/// Identifies a thread pool worker.
+///
+/// This identifier is unique scoped by the thread pool. It is possible that
+/// different thread pool instances share worker identifier values.
+#[derive(Debug, Clone, Hash, Eq, PartialEq)]
+pub struct WorkerId(pub(crate) usize);
+
+// Pointer to the current worker info
+thread_local!(static CURRENT_WORKER: Cell<*const Worker> = Cell::new(0 as *const _));
+
+impl Worker {
+ pub(crate) fn new(
+ id: WorkerId,
+ backup_id: BackupId,
+ pool: Arc<Pool>,
+ trigger: Arc<ShutdownTrigger>,
+ ) -> Worker {
+ Worker {
+ pool,
+ id,
+ backup_id,
+ current_task: CurrentTask::new(),
+ is_blocking: Cell::new(false),
+ should_finalize: Cell::new(false),
+ trigger,
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn is_blocking(&self) -> bool {
+ self.is_blocking.get()
+ }
+
+ /// Run the worker
+ ///
+ /// Returns `true` if the thread should keep running as a `backup` thread.
+ pub(crate) fn do_run(&self) -> bool {
+ // Create another worker... It's ok, this is just a new type around
+ // `Pool` that is expected to stay on the current thread.
+ CURRENT_WORKER.with(|c| {
+ c.set(self as *const _);
+
+ let pool = self.pool.clone();
+ let mut sender = Sender { pool };
+
+ // Enter an execution context
+ let mut enter = tokio_executor::enter().unwrap();
+
+ tokio_executor::with_default(&mut sender, &mut enter, |enter| {
+ if let Some(ref callback) = self.pool.config.around_worker {
+ callback.call(self, enter);
+ } else {
+ self.run();
+ }
+ });
+ });
+
+ // Can't be in blocking mode and finalization mode
+ debug_assert!(!self.is_blocking.get() || !self.should_finalize.get());
+
+ self.is_blocking.get()
+ }
+
+ pub(crate) fn with_current<F: FnOnce(Option<&Worker>) -> R, R>(f: F) -> R {
+ CURRENT_WORKER.with(move |c| {
+ let ptr = c.get();
+
+ if ptr.is_null() {
+ f(None)
+ } else {
+ f(Some(unsafe { &*ptr }))
+ }
+ })
+ }
+
+ /// Transition the current worker to a blocking worker
+ pub(crate) fn transition_to_blocking(&self) -> Poll<(), ::BlockingError> {
+ use self::CanBlock::*;
+
+ // If we get this far, then `current_task` has been set.
+ let task_ref = self.current_task.get_ref();
+
+ // First step is to acquire blocking capacity for the task.
+ match self.current_task.can_block() {
+ // Capacity to block has already been allocated to this task.
+ Allocated => {}
+
+ // The task has already requested capacity to block, but there is
+ // none yet available.
+ NoCapacity => return Ok(Async::NotReady),
+
+ // The task has yet to ask for capacity
+ CanRequest => {
+ // Atomically attempt to acquire blocking capacity, and if none
+ // is available, register the task to be notified once capacity
+ // becomes available.
+ match self.pool.poll_blocking_capacity(task_ref)? {
+ Async::Ready(()) => {
+ self.current_task.set_can_block(Allocated);
+ }
+ Async::NotReady => {
+ self.current_task.set_can_block(NoCapacity);
+ return Ok(Async::NotReady);
+ }
+ }
+ }
+ }
+
+ // The task has been allocated blocking capacity. At this point, this is
+ // when the current thread transitions from a worker to a backup thread.
+ // To do so requires handing over the worker to another backup thread.
+
+ if self.is_blocking.get() {
+ // The thread is already in blocking mode, so there is nothing else
+ // to do. Return `Ready` and allow the caller to block the thread.
+ return Ok(().into());
+ }
+
+ trace!("transition to blocking state");
+
+ // Transitioning to blocking requires handing over the worker state to
+ // another thread so that the work queue can continue to be processed.
+
+ self.pool.spawn_thread(self.id.clone(), &self.pool);
+
+ // Track that the thread has now fully entered the blocking state.
+ self.is_blocking.set(true);
+
+ Ok(().into())
+ }
+
+ /// Transition from blocking
+ pub(crate) fn transition_from_blocking(&self) {
+ // TODO: Attempt to take ownership of the worker again.
+ }
+
+ /// Returns a reference to the worker's identifier.
+ ///
+ /// This identifier is unique scoped by the thread pool. It is possible that
+ /// different thread pool instances share worker identifier values.
+ pub fn id(&self) -> &WorkerId {
+ &self.id
+ }
+
+ /// Run the worker
+ ///
+ /// This function blocks until the worker is shutting down.
+ pub fn run(&self) {
+ const MAX_SPINS: usize = 3;
+ const LIGHT_SLEEP_INTERVAL: usize = 32;
+
+ // Get the notifier.
+ let notify = Arc::new(Notifier {
+ pool: self.pool.clone(),
+ });
+
+ let mut first = true;
+ let mut spin_cnt = 0;
+ let mut tick = 0;
+
+ while self.check_run_state(first) {
+ first = false;
+
+ // Run the next available task
+ if self.try_run_task(&notify) {
+ if self.is_blocking.get() {
+ // Exit out of the run state
+ return;
+ }
+
+ // Poll the reactor and the global queue every now and then to
+ // ensure no task gets left behind.
+ if tick % LIGHT_SLEEP_INTERVAL == 0 {
+ self.sleep_light();
+ }
+
+ tick = tick.wrapping_add(1);
+ spin_cnt = 0;
+
+ // As long as there is work, keep looping.
+ continue;
+ }
+
+ spin_cnt += 1;
+
+ // Yield the thread several times before it actually goes to sleep.
+ if spin_cnt <= MAX_SPINS {
+ thread::yield_now();
+ continue;
+ }
+
+ tick = 0;
+ spin_cnt = 0;
+
+ // Starting to get sleeeeepy
+ if !self.sleep() {
+ return;
+ }
+
+ // If there still isn't any work to do, shutdown the worker?
+ }
+
+ // The pool is terminating. However, transitioning the pool state to
+ // terminated is the very first step of the finalization process. Other
+ // threads may not see this state and try to spawn a new thread. To
+ // ensure consistency, before the current thread shuts down, it must
+ // return the backup token to the stack.
+ //
+ // The returned result is ignored because `Err` represents the pool
+ // shutting down. We are currently aware of this fact.
+ let _ = self.pool.release_backup(self.backup_id);
+
+ self.should_finalize.set(true);
+ }
+
+ /// Try to run a task
+ ///
+ /// Returns `true` if work was found.
+ #[inline]
+ fn try_run_task(&self, notify: &Arc<Notifier>) -> bool {
+ if self.try_run_owned_task(notify) {
+ return true;
+ }
+
+ self.try_steal_task(notify)
+ }
+
+ /// Checks the worker's current state, updating it as needed.
+ ///
+ /// Returns `true` if the worker should run.
+ #[inline]
+ fn check_run_state(&self, first: bool) -> bool {
+ use self::Lifecycle::*;
+
+ debug_assert!(!self.is_blocking.get());
+
+ let mut state: State = self.entry().state.load(Acquire).into();
+
+ loop {
+ let pool_state: pool::State = self.pool.state.load(Acquire).into();
+
+ if pool_state.is_terminated() {
+ return false;
+ }
+
+ let mut next = state;
+
+ match state.lifecycle() {
+ Running => break,
+ Notified | Signaled => {
+ // transition back to running
+ next.set_lifecycle(Running);
+ }
+ Shutdown | Sleeping => {
+ // The worker should never be in these states when calling
+ // this function.
+ panic!("unexpected worker state; lifecycle={:?}", state.lifecycle());
+ }
+ }
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ break;
+ }
+
+ state = actual;
+ }
+
+ // `first` is set to true the first time this function is called after
+ // the thread has started.
+ //
+ // This check is to handle the scenario where a worker gets signaled
+ // while it is already happily running. The `is_signaled` state is
+ // intended to wake up a worker that has been previously sleeping in
+ // effect increasing the number of active workers. If this is the first
+ // time `check_run_state` is called, then being in a signalled state is
+ // normal and the thread was started to handle it. However, if this is
+ // **not** the first time the fn was called, then the number of active
+ // workers has not been increased by the signal, so `signal_work` has to
+ // be called again to try to wake up another worker.
+ //
+ // For example, if the thread pool is configured to allow 4 workers.
+ // Worker 1 is processing tasks from its `deque`. Worker 2 receives its
+ // first task. Worker 2 will pick a random worker to signal. It does
+ // this by popping off the sleep stack, but there is no guarantee that
+ // workers on the sleep stack are actually sleeping. It is possible that
+ // Worker 1 gets signaled.
+ //
+ // Without this check, in the above case, no additional workers will get
+ // started, which results in the thread pool permanently being at 2
+ // workers even though it should reach 4.
+ if !first && state.is_signaled() {
+ trace!("Worker::check_run_state; delegate signal");
+ // This worker is not ready to be signaled, so delegate the signal
+ // to another worker.
+ self.pool.signal_work(&self.pool);
+ }
+
+ true
+ }
+
+ /// Runs the next task on this worker's queue.
+ ///
+ /// Returns `true` if work was found.
+ fn try_run_owned_task(&self, notify: &Arc<Notifier>) -> bool {
+ // Poll the internal queue for a task to run
+ match self.entry().pop_task() {
+ Some(task) => {
+ self.run_task(task, notify);
+ true
+ }
+ None => false,
+ }
+ }
+
+ /// Tries to steal a task from another worker.
+ ///
+ /// Returns `true` if work was found
+ fn try_steal_task(&self, notify: &Arc<Notifier>) -> bool {
+ use crossbeam_deque::Steal;
+
+ debug_assert!(!self.is_blocking.get());
+
+ let len = self.pool.workers.len();
+ let mut idx = self.pool.rand_usize() % len;
+ let mut found_work = false;
+ let start = idx;
+
+ loop {
+ if idx < len {
+ match self.pool.workers[idx].steal_tasks(self.entry()) {
+ Steal::Success(task) => {
+ trace!("stole task from another worker");
+
+ self.run_task(task, notify);
+
+ trace!(
+ "try_steal_task -- signal_work; self={}; from={}",
+ self.id.0,
+ idx
+ );
+
+ // Signal other workers that work is available
+ //
+ // TODO: Should this be called here or before
+ // `run_task`?
+ self.pool.signal_work(&self.pool);
+
+ return true;
+ }
+ Steal::Empty => {}
+ Steal::Retry => found_work = true,
+ }
+
+ idx += 1;
+ } else {
+ idx = 0;
+ }
+
+ if idx == start {
+ break;
+ }
+ }
+
+ found_work
+ }
+
+ fn run_task(&self, task: Arc<Task>, notify: &Arc<Notifier>) {
+ use task::Run::*;
+
+ // If this is the first time this task is being polled, register it so that we can keep
+ // track of tasks that are in progress.
+ if task.reg_worker.get().is_none() {
+ task.reg_worker.set(Some(self.id.0 as u32));
+ self.entry().register_task(&task);
+ }
+
+ let run = self.run_task2(&task, notify);
+
+ // TODO: Try to claim back the worker state in case the backup thread
+ // did not start up fast enough. This is a performance optimization.
+
+ match run {
+ Idle => {}
+ Schedule => {
+ if self.is_blocking.get() {
+ // The future has been notified while it was running.
+ // However, the future also entered a blocking section,
+ // which released the worker state from this thread.
+ //
+ // This means that scheduling the future must be done from
+ // a point of view external to the worker set.
+ //
+ // We have to call `submit_external` instead of `submit`
+ // here because `self` is still set as the current worker.
+ self.pool.submit_external(task, &self.pool);
+ } else {
+ self.entry().push_internal(task);
+ }
+ }
+ Complete => {
+ let mut state: pool::State = self.pool.state.load(Acquire).into();
+
+ loop {
+ let mut next = state;
+ next.dec_num_futures();
+
+ let actual = self
+ .pool
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ trace!("task complete; state={:?}", next);
+
+ if state.num_futures() == 1 {
+ // If the thread pool has been flagged as shutdown,
+ // start terminating workers. This involves waking
+ // up any sleeping worker so that they can notice
+ // the shutdown state.
+ if next.is_terminated() {
+ self.pool.terminate_sleeping_workers();
+ }
+ }
+
+ // Find which worker polled this task first.
+ let worker = task.reg_worker.get().unwrap() as usize;
+
+ // Unregister the task from the worker it was registered in.
+ if !self.is_blocking.get() && worker == self.id.0 {
+ self.entry().unregister_task(task);
+ } else {
+ self.pool.workers[worker].remotely_complete_task(task);
+ }
+
+ // The worker's run loop will detect the shutdown state
+ // next iteration.
+ return;
+ }
+
+ state = actual;
+ }
+ }
+ }
+ }
+
+ /// Actually run the task. This is where `Worker::current_task` is set.
+ ///
+ /// Great care is needed to ensure that `current_task` is unset in this
+ /// function.
+ fn run_task2(&self, task: &Arc<Task>, notify: &Arc<Notifier>) -> task::Run {
+ struct Guard<'a> {
+ worker: &'a Worker,
+ }
+
+ impl<'a> Drop for Guard<'a> {
+ fn drop(&mut self) {
+ // A task is allocated at run when it was explicitly notified
+ // that the task has capacity to block. When this happens, that
+ // capacity is automatically allocated to the notified task.
+ // This capacity is "use it or lose it", so if the thread is not
+ // transitioned to blocking in this call, then another task has
+ // to be notified.
+ //
+ // If the task has consumed its blocking allocation but hasn't
+ // used it, it must be given to some other task instead.
+ if !self.worker.is_blocking.get() {
+ let can_block = self.worker.current_task.can_block();
+ if can_block == CanBlock::Allocated {
+ self.worker.pool.notify_blocking_task(&self.worker.pool);
+ }
+ }
+
+ self.worker.current_task.clear();
+ }
+ }
+
+ // Set `current_task`
+ self.current_task.set(task, CanBlock::CanRequest);
+
+ // Create the guard, this ensures that `current_task` is unset when the
+ // function returns, even if the return is caused by a panic.
+ let _g = Guard { worker: self };
+
+ task.run(notify)
+ }
+
+ /// Put the worker to sleep
+ ///
+ /// Returns `true` if woken up due to new work arriving.
+ fn sleep(&self) -> bool {
+ use self::Lifecycle::*;
+
+ // Putting a worker to sleep is a multipart operation. This is, in part,
+ // due to the fact that a worker can be notified without it being popped
+ // from the sleep stack. Extra care is needed to deal with this.
+
+ trace!("Worker::sleep; worker={:?}", self.id);
+
+ let mut state: State = self.entry().state.load(Acquire).into();
+
+ // The first part of the sleep process is to transition the worker state
+ // to "pushed". Now, it may be that the worker is already pushed on the
+ // sleeper stack, in which case, we don't push again.
+
+ loop {
+ let mut next = state;
+
+ match state.lifecycle() {
+ Running => {
+ // Try setting the pushed state
+ next.set_pushed();
+
+ // Transition the worker state to sleeping
+ next.set_lifecycle(Sleeping);
+ }
+ Notified | Signaled => {
+ // No need to sleep, transition back to running and move on.
+ next.set_lifecycle(Running);
+ }
+ Shutdown | Sleeping => {
+ // The worker cannot transition to sleep when already in a
+ // sleeping state.
+ panic!("unexpected worker state; actual={:?}", state.lifecycle());
+ }
+ }
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ if state.is_notified() {
+ // The previous state was notified, so we don't need to
+ // sleep.
+ return true;
+ }
+
+ if !state.is_pushed() {
+ debug_assert!(next.is_pushed());
+
+ trace!(" sleeping -- push to stack; idx={}", self.id.0);
+
+ // We obtained permission to push the worker into the
+ // sleeper queue.
+ if let Err(_) = self.pool.push_sleeper(self.id.0) {
+ trace!(" sleeping -- push to stack failed; idx={}", self.id.0);
+ // The push failed due to the pool being terminated.
+ //
+ // This is true because the "work" being woken up for is
+ // shutting down.
+ return true;
+ }
+ }
+
+ break;
+ }
+
+ state = actual;
+ }
+
+ trace!(" -> starting to sleep; idx={}", self.id.0);
+
+ // Do a quick check to see if there are any notifications in the
+ // reactor or new tasks in the global queue. Since this call will
+ // clear the wakeup token, we need to check the state again and
+ // only after that go to sleep.
+ self.sleep_light();
+
+ // The state has been transitioned to sleeping, we can now wait by
+ // calling the parker. This is done in a loop as condvars can wakeup
+ // spuriously.
+ loop {
+ // Reload the state
+ state = self.entry().state.load(Acquire).into();
+
+ // If the worker has been notified, transition back to running.
+ match state.lifecycle() {
+ Sleeping => {
+ // Still sleeping. Park again.
+ }
+ Notified | Signaled => {
+ // Transition back to running
+ loop {
+ let mut next = state;
+ next.set_lifecycle(Running);
+
+ let actual = self
+ .entry()
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ return true;
+ }
+
+ state = actual;
+ }
+ }
+ Shutdown | Running => {
+ // To get here, the block above transitioned the state to
+ // `Sleeping`. No other thread can concurrently
+ // transition to `Shutdown` or `Running`.
+ unreachable!();
+ }
+ }
+
+ self.entry().park();
+
+ trace!(" -> wakeup; idx={}", self.id.0);
+ }
+ }
+
+ /// This doesn't actually put the thread to sleep. It calls
+ /// `park.park_timeout` with a duration of 0. This allows the park
+ /// implementation to perform any work that might be done on an interval.
+ ///
+ /// Returns `true` if this worker has tasks in its queue.
+ fn sleep_light(&self) {
+ self.entry().park_timeout(Duration::from_millis(0));
+
+ use crossbeam_deque::Steal;
+ loop {
+ match self.pool.queue.steal_batch(&self.entry().worker) {
+ Steal::Success(()) => {
+ self.pool.signal_work(&self.pool);
+ break;
+ }
+ Steal::Empty => break,
+ Steal::Retry => {}
+ }
+ }
+ }
+
+ fn entry(&self) -> &Entry {
+ debug_assert!(!self.is_blocking.get());
+ &self.pool.workers[self.id.0]
+ }
+}
+
+impl Drop for Worker {
+ fn drop(&mut self) {
+ trace!("shutting down thread; idx={}", self.id.0);
+
+ if self.should_finalize.get() {
+ // Drain the work queue
+ self.entry().drain_tasks();
+ }
+ }
+}
+
+// ===== impl CurrentTask =====
+
+impl CurrentTask {
+ /// Returns a default `CurrentTask` representing no task.
+ fn new() -> CurrentTask {
+ CurrentTask {
+ task: Cell::new(None),
+ can_block: Cell::new(CanBlock::CanRequest),
+ }
+ }
+
+ /// Returns a reference to the task.
+ fn get_ref(&self) -> &Arc<Task> {
+ unsafe { &*self.task.get().unwrap() }
+ }
+
+ fn can_block(&self) -> CanBlock {
+ use self::CanBlock::*;
+
+ match self.can_block.get() {
+ Allocated => Allocated,
+ CanRequest | NoCapacity => {
+ let can_block = self.get_ref().consume_blocking_allocation();
+ self.can_block.set(can_block);
+ can_block
+ }
+ }
+ }
+
+ fn set_can_block(&self, can_block: CanBlock) {
+ self.can_block.set(can_block);
+ }
+
+ fn set(&self, task: &Arc<Task>, can_block: CanBlock) {
+ self.task.set(Some(task as *const _));
+ self.can_block.set(can_block);
+ }
+
+ /// Reset the `CurrentTask` to null state.
+ fn clear(&self) {
+ self.task.set(None);
+ self.can_block.set(CanBlock::CanRequest);
+ }
+}
+
+// ===== impl WorkerId =====
+
+impl WorkerId {
+ /// Returns a `WorkerId` representing the worker entry at index `idx`.
+ pub(crate) fn new(idx: usize) -> WorkerId {
+ WorkerId(idx)
+ }
+
+ /// Returns this identifier represented as an integer.
+ ///
+ /// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
+ /// range `0..pool_size`.
+ pub fn to_usize(&self) -> usize {
+ self.0
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/stack.rs b/third_party/rust/tokio-threadpool/src/worker/stack.rs
new file mode 100644
index 0000000000..d02c277fed
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/stack.rs
@@ -0,0 +1,260 @@
+use config::MAX_WORKERS;
+use worker;
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
+use std::{fmt, usize};
+
+/// Lock-free stack of sleeping workers.
+///
+/// This is implemented as a Treiber stack and references to nodes are
+/// `usize` values, indexing the entry in the `[worker::Entry]` array stored by
+/// `Pool`. Each `Entry` instance maintains a `pushed` bit in its state. This
+/// bit tracks if the entry is already pushed onto the stack or not. A single
+/// entry can only be stored on the stack a single time.
+///
+/// By using indexes instead of pointers, that allows a much greater amount of
+/// data to be used for the ABA guard (see correctness section of wikipedia
+/// page).
+///
+/// Treiber stack: https://en.wikipedia.org/wiki/Treiber_Stack
+#[derive(Debug)]
+pub(crate) struct Stack {
+ state: AtomicUsize,
+}
+
+/// State related to the stack of sleeping workers.
+///
+/// - Parked head 16 bits
+/// - Sequence remaining
+///
+/// The parked head value has a couple of special values:
+///
+/// - EMPTY: No sleepers
+/// - TERMINATED: Don't spawn more threads
+#[derive(Eq, PartialEq, Clone, Copy)]
+pub struct State(usize);
+
+/// Extracts the head of the worker stack from the scheduler state
+///
+/// The 16 relates to the value of MAX_WORKERS
+const STACK_MASK: usize = ((1 << 16) - 1);
+
+/// Used to mark the stack as empty
+pub(crate) const EMPTY: usize = MAX_WORKERS;
+
+/// Used to mark the stack as terminated
+pub(crate) const TERMINATED: usize = EMPTY + 1;
+
+/// How many bits the Treiber ABA guard is offset by
+const ABA_GUARD_SHIFT: usize = 16;
+
+#[cfg(target_pointer_width = "64")]
+const ABA_GUARD_MASK: usize = (1 << (64 - ABA_GUARD_SHIFT)) - 1;
+
+#[cfg(target_pointer_width = "32")]
+const ABA_GUARD_MASK: usize = (1 << (32 - ABA_GUARD_SHIFT)) - 1;
+
+// ===== impl Stack =====
+
+impl Stack {
+ /// Create a new `Stack` representing the empty state.
+ pub fn new() -> Stack {
+ let state = AtomicUsize::new(State::new().into());
+ Stack { state }
+ }
+
+ /// Push a worker onto the stack
+ ///
+ /// # Return
+ ///
+ /// Returns `Ok` on success.
+ ///
+ /// Returns `Err` if the pool has transitioned to the `TERMINATED` state.
+ /// When terminated, pushing new entries is no longer permitted.
+ pub fn push(&self, entries: &[worker::Entry], idx: usize) -> Result<(), ()> {
+ let mut state: State = self.state.load(Acquire).into();
+
+ debug_assert!(worker::State::from(entries[idx].state.load(Relaxed)).is_pushed());
+
+ loop {
+ let mut next = state;
+
+ let head = state.head();
+
+ if head == TERMINATED {
+ // The pool is terminated, cannot push the sleeper.
+ return Err(());
+ }
+
+ entries[idx].set_next_sleeper(head);
+ next.set_head(idx);
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if state == actual {
+ return Ok(());
+ }
+
+ state = actual;
+ }
+ }
+
+ /// Pop a worker off the stack.
+ ///
+ /// If `terminate` is set and the stack is empty when this function is
+ /// called, the state of the stack is transitioned to "terminated". At this
+ /// point, no further workers can be pushed onto the stack.
+ ///
+ /// # Return
+ ///
+ /// Returns the index of the popped worker and the worker's observed state.
+ ///
+ /// `None` if the stack is empty.
+ pub fn pop(
+ &self,
+ entries: &[worker::Entry],
+ max_lifecycle: worker::Lifecycle,
+ terminate: bool,
+ ) -> Option<(usize, worker::State)> {
+ // Figure out the empty value
+ let terminal = match terminate {
+ true => TERMINATED,
+ false => EMPTY,
+ };
+
+ // If terminating, the max lifecycle *must* be `Signaled`, which is the
+ // highest lifecycle. By passing the greatest possible lifecycle value,
+ // no entries are skipped by this function.
+ //
+ // TODO: It would be better to terminate in a separate function that
+ // atomically takes all values and transitions to a terminated state.
+ debug_assert!(!terminate || max_lifecycle == worker::Lifecycle::Signaled);
+
+ let mut state: State = self.state.load(Acquire).into();
+
+ loop {
+ let head = state.head();
+
+ if head == EMPTY {
+ let mut next = state;
+ next.set_head(terminal);
+
+ if next == state {
+ debug_assert!(terminal == EMPTY);
+ return None;
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual != state {
+ state = actual;
+ continue;
+ }
+
+ return None;
+ } else if head == TERMINATED {
+ return None;
+ }
+
+ debug_assert!(head < MAX_WORKERS);
+
+ let mut next = state;
+
+ let next_head = entries[head].next_sleeper();
+
+ // TERMINATED can never be set as the "next pointer" on a worker.
+ debug_assert!(next_head != TERMINATED);
+
+ if next_head == EMPTY {
+ next.set_head(terminal);
+ } else {
+ next.set_head(next_head);
+ }
+
+ let actual = self
+ .state
+ .compare_and_swap(state.into(), next.into(), AcqRel)
+ .into();
+
+ if actual == state {
+ // Release ordering is needed to ensure that unsetting the
+ // `pushed` flag happens after popping the sleeper from the
+ // stack.
+ //
+ // Acquire ordering is required to acquire any memory associated
+ // with transitioning the worker's lifecycle.
+ let state = entries[head].fetch_unset_pushed(AcqRel);
+
+ if state.lifecycle() >= max_lifecycle {
+ // If the worker has already been notified, then it is
+ // warming up to do more work. In this case, try to pop
+ // another thread that might be in a relaxed state.
+ continue;
+ }
+
+ return Some((head, state));
+ }
+
+ state = actual;
+ }
+ }
+}
+
+// ===== impl State =====
+
+impl State {
+ #[inline]
+ fn new() -> State {
+ State(EMPTY)
+ }
+
+ #[inline]
+ fn head(&self) -> usize {
+ self.0 & STACK_MASK
+ }
+
+ #[inline]
+ fn set_head(&mut self, val: usize) {
+ // The ABA guard protects against the ABA problem w/ Treiber stacks
+ let aba_guard = ((self.0 >> ABA_GUARD_SHIFT) + 1) & ABA_GUARD_MASK;
+
+ self.0 = (aba_guard << ABA_GUARD_SHIFT) | val;
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ let head = self.head();
+
+ let mut fmt = fmt.debug_struct("stack::State");
+
+ if head < MAX_WORKERS {
+ fmt.field("head", &head);
+ } else if head == EMPTY {
+ fmt.field("head", &"EMPTY");
+ } else if head == TERMINATED {
+ fmt.field("head", &"TERMINATED");
+ }
+
+ fmt.finish()
+ }
+}
diff --git a/third_party/rust/tokio-threadpool/src/worker/state.rs b/third_party/rust/tokio-threadpool/src/worker/state.rs
new file mode 100644
index 0000000000..c388f6c99e
--- /dev/null
+++ b/third_party/rust/tokio-threadpool/src/worker/state.rs
@@ -0,0 +1,153 @@
+use std::fmt;
+
+/// Tracks worker state
+#[derive(Clone, Copy, Eq, PartialEq)]
+pub(crate) struct State(usize);
+
+/// Set when the worker is pushed onto the scheduler's stack of sleeping
+/// threads.
+pub(crate) const PUSHED_MASK: usize = 0b001;
+
+/// Manages the worker lifecycle part of the state
+const LIFECYCLE_MASK: usize = 0b1110;
+const LIFECYCLE_SHIFT: usize = 1;
+
+#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Copy)]
+#[repr(usize)]
+pub(crate) enum Lifecycle {
+ /// The worker does not currently have an associated thread.
+ Shutdown = 0 << LIFECYCLE_SHIFT,
+
+ /// The worker is doing work
+ Running = 1 << LIFECYCLE_SHIFT,
+
+ /// The worker is currently asleep in the condvar
+ Sleeping = 2 << LIFECYCLE_SHIFT,
+
+ /// The worker has been notified it should process more work.
+ Notified = 3 << LIFECYCLE_SHIFT,
+
+ /// A stronger form of notification. In this case, the worker is expected to
+ /// wakeup and try to acquire more work... if it enters this state while
+ /// already busy with other work, it is expected to signal another worker.
+ Signaled = 4 << LIFECYCLE_SHIFT,
+}
+
+impl State {
+ /// Returns true if the worker entry is pushed in the sleeper stack
+ pub fn is_pushed(&self) -> bool {
+ self.0 & PUSHED_MASK == PUSHED_MASK
+ }
+
+ pub fn set_pushed(&mut self) {
+ self.0 |= PUSHED_MASK
+ }
+
+ pub fn is_notified(&self) -> bool {
+ use self::Lifecycle::*;
+
+ match self.lifecycle() {
+ Notified | Signaled => true,
+ _ => false,
+ }
+ }
+
+ pub fn lifecycle(&self) -> Lifecycle {
+ Lifecycle::from(self.0 & LIFECYCLE_MASK)
+ }
+
+ pub fn set_lifecycle(&mut self, val: Lifecycle) {
+ self.0 = (self.0 & !LIFECYCLE_MASK) | (val as usize)
+ }
+
+ pub fn is_signaled(&self) -> bool {
+ self.lifecycle() == Lifecycle::Signaled
+ }
+
+ pub fn notify(&mut self) {
+ use self::Lifecycle::Signaled;
+
+ if self.lifecycle() != Signaled {
+ self.set_lifecycle(Signaled)
+ }
+ }
+}
+
+impl Default for State {
+ fn default() -> State {
+ // All workers will start pushed in the sleeping stack
+ State(PUSHED_MASK)
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> Self {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> Self {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("lifecycle", &self.lifecycle())
+ .field("is_pushed", &self.is_pushed())
+ .finish()
+ }
+}
+
+// ===== impl Lifecycle =====
+
+impl From<usize> for Lifecycle {
+ fn from(src: usize) -> Lifecycle {
+ use self::Lifecycle::*;
+
+ debug_assert!(
+ src == Shutdown as usize
+ || src == Running as usize
+ || src == Sleeping as usize
+ || src == Notified as usize
+ || src == Signaled as usize
+ );
+
+ unsafe { ::std::mem::transmute(src) }
+ }
+}
+
+impl From<Lifecycle> for usize {
+ fn from(src: Lifecycle) -> usize {
+ let v = src as usize;
+ debug_assert!(v & LIFECYCLE_MASK == v);
+ v
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::Lifecycle::*;
+ use super::*;
+
+ #[test]
+ fn lifecycle_encode() {
+ let lifecycles = &[Shutdown, Running, Sleeping, Notified, Signaled];
+
+ for &lifecycle in lifecycles {
+ let mut v: usize = lifecycle.into();
+ v &= LIFECYCLE_MASK;
+
+ assert_eq!(lifecycle, Lifecycle::from(v));
+ }
+ }
+
+ #[test]
+ fn lifecycle_ord() {
+ assert!(Running >= Shutdown);
+ assert!(Signaled >= Notified);
+ assert!(Signaled >= Sleeping);
+ }
+}