summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/runtime/thread_pool
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 19:33:14 +0000
commit36d22d82aa202bb199967e9512281e9a53db42c9 (patch)
tree105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/runtime/thread_pool
parentInitial commit. (diff)
downloadfirefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz
firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/runtime/thread_pool')
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/idle.rs226
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/mod.rs136
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/worker.rs848
3 files changed, 1210 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/idle.rs b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs
new file mode 100644
index 0000000000..a57bf6a0b1
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs
@@ -0,0 +1,226 @@
+//! Coordinates idling workers
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Mutex;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{self, SeqCst};
+
+pub(super) struct Idle {
+ /// Tracks both the number of searching workers and the number of unparked
+ /// workers.
+ ///
+ /// Used as a fast-path to avoid acquiring the lock when needed.
+ state: AtomicUsize,
+
+ /// Sleeping workers
+ sleepers: Mutex<Vec<usize>>,
+
+ /// Total number of workers.
+ num_workers: usize,
+}
+
+const UNPARK_SHIFT: usize = 16;
+const UNPARK_MASK: usize = !SEARCH_MASK;
+const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
+
+#[derive(Copy, Clone)]
+struct State(usize);
+
+impl Idle {
+ pub(super) fn new(num_workers: usize) -> Idle {
+ let init = State::new(num_workers);
+
+ Idle {
+ state: AtomicUsize::new(init.into()),
+ sleepers: Mutex::new(Vec::with_capacity(num_workers)),
+ num_workers,
+ }
+ }
+
+ /// If there are no workers actively searching, returns the index of a
+ /// worker currently sleeping.
+ pub(super) fn worker_to_notify(&self) -> Option<usize> {
+ // If at least one worker is spinning, work being notified will
+ // eventually be found. A searching thread will find **some** work and
+ // notify another worker, eventually leading to our work being found.
+ //
+ // For this to happen, this load must happen before the thread
+ // transitioning `num_searching` to zero. Acquire / Release does not
+ // provide sufficient guarantees, so this load is done with `SeqCst` and
+ // will pair with the `fetch_sub(1)` when transitioning out of
+ // searching.
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock();
+
+ // Check again, now that the lock is acquired
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // A worker should be woken up, atomically increment the number of
+ // searching workers as well as the number of unparked workers.
+ State::unpark_one(&self.state, 1);
+
+ // Get the worker to unpark
+ let ret = sleepers.pop();
+ debug_assert!(ret.is_some());
+
+ ret
+ }
+
+ /// Returns `true` if the worker needs to do a final check for submitted
+ /// work.
+ pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock();
+
+ // Decrement the number of unparked threads
+ let ret = State::dec_num_unparked(&self.state, is_searching);
+
+ // Track the sleeping worker
+ sleepers.push(worker);
+
+ ret
+ }
+
+ pub(super) fn transition_worker_to_searching(&self) -> bool {
+ let state = State::load(&self.state, SeqCst);
+ if 2 * state.num_searching() >= self.num_workers {
+ return false;
+ }
+
+ // It is possible for this routine to allow more than 50% of the workers
+ // to search. That is OK. Limiting searchers is only an optimization to
+ // prevent too much contention.
+ State::inc_num_searching(&self.state, SeqCst);
+ true
+ }
+
+ /// A lightweight transition from searching -> running.
+ ///
+ /// Returns `true` if this is the final searching worker. The caller
+ /// **must** notify a new worker.
+ pub(super) fn transition_worker_from_searching(&self) -> bool {
+ State::dec_num_searching(&self.state)
+ }
+
+ /// Unpark a specific worker. This happens if tasks are submitted from
+ /// within the worker's park routine.
+ ///
+ /// Returns `true` if the worker was parked before calling the method.
+ pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool {
+ let mut sleepers = self.sleepers.lock();
+
+ for index in 0..sleepers.len() {
+ if sleepers[index] == worker_id {
+ sleepers.swap_remove(index);
+
+ // Update the state accordingly while the lock is held.
+ State::unpark_one(&self.state, 0);
+
+ return true;
+ }
+ }
+
+ false
+ }
+
+ /// Returns `true` if `worker_id` is contained in the sleep set.
+ pub(super) fn is_parked(&self, worker_id: usize) -> bool {
+ let sleepers = self.sleepers.lock();
+ sleepers.contains(&worker_id)
+ }
+
+ fn notify_should_wakeup(&self) -> bool {
+ let state = State(self.state.fetch_add(0, SeqCst));
+ state.num_searching() == 0 && state.num_unparked() < self.num_workers
+ }
+}
+
+impl State {
+ fn new(num_workers: usize) -> State {
+ // All workers start in the unparked state
+ let ret = State(num_workers << UNPARK_SHIFT);
+ debug_assert_eq!(num_workers, ret.num_unparked());
+ debug_assert_eq!(0, ret.num_searching());
+ ret
+ }
+
+ fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
+ State(cell.load(ordering))
+ }
+
+ fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
+ cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
+ }
+
+ fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
+ cell.fetch_add(1, ordering);
+ }
+
+ /// Returns `true` if this is the final searching worker
+ fn dec_num_searching(cell: &AtomicUsize) -> bool {
+ let state = State(cell.fetch_sub(1, SeqCst));
+ state.num_searching() == 1
+ }
+
+ /// Track a sleeping worker
+ ///
+ /// Returns `true` if this is the final searching worker.
+ fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
+ let mut dec = 1 << UNPARK_SHIFT;
+
+ if is_searching {
+ dec += 1;
+ }
+
+ let prev = State(cell.fetch_sub(dec, SeqCst));
+ is_searching && prev.num_searching() == 1
+ }
+
+ /// Number of workers currently searching
+ fn num_searching(self) -> usize {
+ self.0 & SEARCH_MASK
+ }
+
+ /// Number of workers currently unparked
+ fn num_unparked(self) -> usize {
+ (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("num_unparked", &self.num_unparked())
+ .field("num_searching", &self.num_searching())
+ .finish()
+ }
+}
+
+#[test]
+fn test_state() {
+ assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
+ assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
+
+ let state = State::new(10);
+ assert_eq!(10, state.num_unparked());
+ assert_eq!(0, state.num_searching());
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/mod.rs b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs
new file mode 100644
index 0000000000..d3f46517cb
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs
@@ -0,0 +1,136 @@
+//! Threadpool
+
+mod idle;
+use self::idle::Idle;
+
+mod worker;
+pub(crate) use worker::Launch;
+
+pub(crate) use worker::block_in_place;
+
+use crate::loom::sync::Arc;
+use crate::runtime::task::JoinHandle;
+use crate::runtime::{Callback, Parker};
+
+use std::fmt;
+use std::future::Future;
+
+/// Work-stealing based thread pool for executing futures.
+pub(crate) struct ThreadPool {
+ spawner: Spawner,
+}
+
+/// Submits futures to the associated thread pool for execution.
+///
+/// A `Spawner` instance is a handle to a single thread pool that allows the owner
+/// of the handle to spawn futures onto the thread pool.
+///
+/// The `Spawner` handle is *only* used for spawning new futures. It does not
+/// impact the lifecycle of the thread pool in any way. The thread pool may
+/// shut down while there are outstanding `Spawner` instances.
+///
+/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`].
+///
+/// [`ThreadPool::spawner`]: method@ThreadPool::spawner
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ shared: Arc<worker::Shared>,
+}
+
+// ===== impl ThreadPool =====
+
+impl ThreadPool {
+ pub(crate) fn new(
+ size: usize,
+ parker: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+ ) -> (ThreadPool, Launch) {
+ let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
+ let spawner = Spawner { shared };
+ let thread_pool = ThreadPool { spawner };
+
+ (thread_pool, launch)
+ }
+
+ /// Returns reference to `Spawner`.
+ ///
+ /// The `Spawner` handle can be cloned and enables spawning tasks from other
+ /// threads.
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ /// Blocks the current thread waiting for the future to complete.
+ ///
+ /// The future will execute on the current thread, but all spawned tasks
+ /// will be executed on the thread pool.
+ pub(crate) fn block_on<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ let mut enter = crate::runtime::enter(true);
+ enter.block_on(future).expect("failed to park thread")
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool").finish()
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.spawner.shutdown();
+ }
+}
+
+// ==== impl Spawner =====
+
+impl Spawner {
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: crate::future::Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ worker::Shared::bind_new_task(&self.shared, future)
+ }
+
+ pub(crate) fn shutdown(&mut self) {
+ self.shared.close();
+ }
+}
+
+cfg_metrics! {
+ use crate::runtime::{SchedulerMetrics, WorkerMetrics};
+
+ impl Spawner {
+ pub(crate) fn num_workers(&self) -> usize {
+ self.shared.worker_metrics.len()
+ }
+
+ pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
+ &self.shared.scheduler_metrics
+ }
+
+ pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
+ &self.shared.worker_metrics[worker]
+ }
+
+ pub(crate) fn injection_queue_depth(&self) -> usize {
+ self.shared.injection_queue_depth()
+ }
+
+ pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
+ self.shared.worker_local_queue_depth(worker)
+ }
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Spawner").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/worker.rs b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs
new file mode 100644
index 0000000000..7e4989701e
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs
@@ -0,0 +1,848 @@
+//! A scheduler is initialized with a fixed number of workers. Each worker is
+//! driven by a thread. Each worker has a "core" which contains data such as the
+//! run queue and other state. When `block_in_place` is called, the worker's
+//! "core" is handed off to a new thread allowing the scheduler to continue to
+//! make progress while the originating thread blocks.
+//!
+//! # Shutdown
+//!
+//! Shutting down the runtime involves the following steps:
+//!
+//! 1. The Shared::close method is called. This closes the inject queue and
+//! OwnedTasks instance and wakes up all worker threads.
+//!
+//! 2. Each worker thread observes the close signal next time it runs
+//! Core::maintenance by checking whether the inject queue is closed.
+//! The Core::is_shutdown flag is set to true.
+//!
+//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
+//! will keep removing tasks from OwnedTasks until it is empty. No new
+//! tasks can be pushed to the OwnedTasks during or after this step as it
+//! was closed in step 1.
+//!
+//! 5. The workers call Shared::shutdown to enter the single-threaded phase of
+//! shutdown. These calls will push their core to Shared::shutdown_cores,
+//! and the last thread to push its core will finish the shutdown procedure.
+//!
+//! 6. The local run queue of each core is emptied, then the inject queue is
+//! emptied.
+//!
+//! At this point, shutdown has completed. It is not possible for any of the
+//! collections to contain any tasks at this point, as each collection was
+//! closed first, then emptied afterwards.
+//!
+//! ## Spawns during shutdown
+//!
+//! When spawning tasks during shutdown, there are two cases:
+//!
+//! * The spawner observes the OwnedTasks being open, and the inject queue is
+//! closed.
+//! * The spawner observes the OwnedTasks being closed and doesn't check the
+//! inject queue.
+//!
+//! The first case can only happen if the OwnedTasks::bind call happens before
+//! or during step 1 of shutdown. In this case, the runtime will clean up the
+//! task in step 3 of shutdown.
+//!
+//! In the latter case, the task was not spawned and the task is immediately
+//! cancelled by the spawner.
+//!
+//! The correctness of shutdown requires both the inject queue and OwnedTasks
+//! collection to have a closed bit. With a close bit on only the inject queue,
+//! spawning could run in to a situation where a task is successfully bound long
+//! after the runtime has shut down. With a close bit on only the OwnedTasks,
+//! the first spawning situation could result in the notification being pushed
+//! to the inject queue after step 6 of shutdown, which would leave a task in
+//! the inject queue indefinitely. This would be a ref-count cycle and a memory
+//! leak.
+
+use crate::coop;
+use crate::future::Future;
+use crate::loom::rand::seed;
+use crate::loom::sync::{Arc, Mutex};
+use crate::park::{Park, Unpark};
+use crate::runtime;
+use crate::runtime::enter::EnterContext;
+use crate::runtime::park::{Parker, Unparker};
+use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
+use crate::runtime::thread_pool::Idle;
+use crate::runtime::{queue, task, Callback, MetricsBatch, SchedulerMetrics, WorkerMetrics};
+use crate::util::atomic_cell::AtomicCell;
+use crate::util::FastRand;
+
+use std::cell::RefCell;
+use std::time::Duration;
+
+/// A scheduler worker
+pub(super) struct Worker {
+ /// Reference to shared state
+ shared: Arc<Shared>,
+
+ /// Index holding this worker's remote state
+ index: usize,
+
+ /// Used to hand-off a worker's core to another thread.
+ core: AtomicCell<Core>,
+}
+
+/// Core data
+struct Core {
+ /// Used to schedule bookkeeping tasks every so often.
+ tick: u8,
+
+ /// When a task is scheduled from a worker, it is stored in this slot. The
+ /// worker will check this slot for a task **before** checking the run
+ /// queue. This effectively results in the **last** scheduled task to be run
+ /// next (LIFO). This is an optimization for message passing patterns and
+ /// helps to reduce latency.
+ lifo_slot: Option<Notified>,
+
+ /// The worker-local run queue.
+ run_queue: queue::Local<Arc<Shared>>,
+
+ /// True if the worker is currently searching for more work. Searching
+ /// involves attempting to steal from other workers.
+ is_searching: bool,
+
+ /// True if the scheduler is being shutdown
+ is_shutdown: bool,
+
+ /// Parker
+ ///
+ /// Stored in an `Option` as the parker is added / removed to make the
+ /// borrow checker happy.
+ park: Option<Parker>,
+
+ /// Batching metrics so they can be submitted to RuntimeMetrics.
+ metrics: MetricsBatch,
+
+ /// Fast random number generator.
+ rand: FastRand,
+}
+
+/// State shared across all workers
+pub(super) struct Shared {
+ /// Per-worker remote state. All other workers have access to this and is
+ /// how they communicate between each other.
+ remotes: Box<[Remote]>,
+
+ /// Submits work to the scheduler while **not** currently on a worker thread.
+ inject: Inject<Arc<Shared>>,
+
+ /// Coordinates idle workers
+ idle: Idle,
+
+ /// Collection of all active tasks spawned onto this executor.
+ owned: OwnedTasks<Arc<Shared>>,
+
+ /// Cores that have observed the shutdown signal
+ ///
+ /// The core is **not** placed back in the worker to avoid it from being
+ /// stolen by a thread that was spawned as part of `block_in_place`.
+ #[allow(clippy::vec_box)] // we're moving an already-boxed value
+ shutdown_cores: Mutex<Vec<Box<Core>>>,
+
+ /// Callback for a worker parking itself
+ before_park: Option<Callback>,
+ /// Callback for a worker unparking itself
+ after_unpark: Option<Callback>,
+
+ /// Collects metrics from the runtime.
+ pub(super) scheduler_metrics: SchedulerMetrics,
+
+ pub(super) worker_metrics: Box<[WorkerMetrics]>,
+}
+
+/// Used to communicate with a worker from other threads.
+struct Remote {
+ /// Steals tasks from this worker.
+ steal: queue::Steal<Arc<Shared>>,
+
+ /// Unparks the associated worker thread
+ unpark: Unparker,
+}
+
+/// Thread-local context
+struct Context {
+ /// Worker
+ worker: Arc<Worker>,
+
+ /// Core data
+ core: RefCell<Option<Box<Core>>>,
+}
+
+/// Starts the workers
+pub(crate) struct Launch(Vec<Arc<Worker>>);
+
+/// Running a task may consume the core. If the core is still available when
+/// running the task completes, it is returned. Otherwise, the worker will need
+/// to stop processing.
+type RunResult = Result<Box<Core>, ()>;
+
+/// A task handle
+type Task = task::Task<Arc<Shared>>;
+
+/// A notified task handle
+type Notified = task::Notified<Arc<Shared>>;
+
+// Tracks thread-local state
+scoped_thread_local!(static CURRENT: Context);
+
+pub(super) fn create(
+ size: usize,
+ park: Parker,
+ before_park: Option<Callback>,
+ after_unpark: Option<Callback>,
+) -> (Arc<Shared>, Launch) {
+ let mut cores = vec![];
+ let mut remotes = vec![];
+ let mut worker_metrics = vec![];
+
+ // Create the local queues
+ for _ in 0..size {
+ let (steal, run_queue) = queue::local();
+
+ let park = park.clone();
+ let unpark = park.unpark();
+
+ cores.push(Box::new(Core {
+ tick: 0,
+ lifo_slot: None,
+ run_queue,
+ is_searching: false,
+ is_shutdown: false,
+ park: Some(park),
+ metrics: MetricsBatch::new(),
+ rand: FastRand::new(seed()),
+ }));
+
+ remotes.push(Remote { steal, unpark });
+ worker_metrics.push(WorkerMetrics::new());
+ }
+
+ let shared = Arc::new(Shared {
+ remotes: remotes.into_boxed_slice(),
+ inject: Inject::new(),
+ idle: Idle::new(size),
+ owned: OwnedTasks::new(),
+ shutdown_cores: Mutex::new(vec![]),
+ before_park,
+ after_unpark,
+ scheduler_metrics: SchedulerMetrics::new(),
+ worker_metrics: worker_metrics.into_boxed_slice(),
+ });
+
+ let mut launch = Launch(vec![]);
+
+ for (index, core) in cores.drain(..).enumerate() {
+ launch.0.push(Arc::new(Worker {
+ shared: shared.clone(),
+ index,
+ core: AtomicCell::new(Some(core)),
+ }));
+ }
+
+ (shared, launch)
+}
+
+pub(crate) fn block_in_place<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // Try to steal the worker core back
+ struct Reset(coop::Budget);
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ let core = cx.worker.core.take();
+ let mut cx_core = cx.core.borrow_mut();
+ assert!(cx_core.is_none());
+ *cx_core = core;
+
+ // Reset the task budget as we are re-entering the
+ // runtime.
+ coop::set(self.0);
+ }
+ });
+ }
+ }
+
+ let mut had_entered = false;
+
+ CURRENT.with(|maybe_cx| {
+ match (crate::runtime::enter::context(), maybe_cx.is_some()) {
+ (EnterContext::Entered { .. }, true) => {
+ // We are on a thread pool runtime thread, so we just need to
+ // set up blocking.
+ had_entered = true;
+ }
+ (EnterContext::Entered { allow_blocking }, false) => {
+ // We are on an executor, but _not_ on the thread pool. That is
+ // _only_ okay if we are in a thread pool runtime's block_on
+ // method:
+ if allow_blocking {
+ had_entered = true;
+ return;
+ } else {
+ // This probably means we are on the basic_scheduler or in a
+ // LocalSet, where it is _not_ okay to block.
+ panic!("can call blocking only when running on the multi-threaded runtime");
+ }
+ }
+ (EnterContext::NotEntered, true) => {
+ // This is a nested call to block_in_place (we already exited).
+ // All the necessary setup has already been done.
+ return;
+ }
+ (EnterContext::NotEntered, false) => {
+ // We are outside of the tokio runtime, so blocking is fine.
+ // We can also skip all of the thread pool blocking setup steps.
+ return;
+ }
+ }
+
+ let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
+
+ // Get the worker core. If none is set, then blocking is fine!
+ let core = match cx.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return,
+ };
+
+ // The parker should be set here
+ assert!(core.park.is_some());
+
+ // In order to block, the core must be sent to another thread for
+ // execution.
+ //
+ // First, move the core back into the worker's shared core slot.
+ cx.worker.core.set(core);
+
+ // Next, clone the worker handle and send it to a new thread for
+ // processing.
+ //
+ // Once the blocking task is done executing, we will attempt to
+ // steal the core back.
+ let worker = cx.worker.clone();
+ runtime::spawn_blocking(move || run(worker));
+ });
+
+ if had_entered {
+ // Unset the current task's budget. Blocking sections are not
+ // constrained by task budgets.
+ let _reset = Reset(coop::stop());
+
+ crate::runtime::enter::exit(f)
+ } else {
+ f()
+ }
+}
+
+/// After how many ticks is the global queue polled. This helps to ensure
+/// fairness.
+///
+/// The number is fairly arbitrary. I believe this value was copied from golang.
+const GLOBAL_POLL_INTERVAL: u8 = 61;
+
+impl Launch {
+ pub(crate) fn launch(mut self) {
+ for worker in self.0.drain(..) {
+ runtime::spawn_blocking(move || run(worker));
+ }
+ }
+}
+
+fn run(worker: Arc<Worker>) {
+ // Acquire a core. If this fails, then another thread is running this
+ // worker and there is nothing further to do.
+ let core = match worker.core.take() {
+ Some(core) => core,
+ None => return,
+ };
+
+ // Set the worker context.
+ let cx = Context {
+ worker,
+ core: RefCell::new(None),
+ };
+
+ let _enter = crate::runtime::enter(true);
+
+ CURRENT.set(&cx, || {
+ // This should always be an error. It only returns a `Result` to support
+ // using `?` to short circuit.
+ assert!(cx.run(core).is_err());
+ });
+}
+
+impl Context {
+ fn run(&self, mut core: Box<Core>) -> RunResult {
+ while !core.is_shutdown {
+ // Increment the tick
+ core.tick();
+
+ // Run maintenance, if needed
+ core = self.maintenance(core);
+
+ // First, check work available to the current worker.
+ if let Some(task) = core.next_task(&self.worker) {
+ core = self.run_task(task, core)?;
+ continue;
+ }
+
+ // There is no more **local** work to process, try to steal work
+ // from other workers.
+ if let Some(task) = core.steal_work(&self.worker) {
+ core = self.run_task(task, core)?;
+ } else {
+ // Wait for work
+ core = self.park(core);
+ }
+ }
+
+ core.pre_shutdown(&self.worker);
+
+ // Signal shutdown
+ self.worker.shared.shutdown(core);
+ Err(())
+ }
+
+ fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
+ let task = self.worker.shared.owned.assert_owner(task);
+
+ // Make sure the worker is not in the **searching** state. This enables
+ // another idle worker to try to steal work.
+ core.transition_from_searching(&self.worker);
+
+ // Make the core available to the runtime context
+ core.metrics.incr_poll_count();
+ *self.core.borrow_mut() = Some(core);
+
+ // Run the task
+ coop::budget(|| {
+ task.run();
+
+ // As long as there is budget remaining and a task exists in the
+ // `lifo_slot`, then keep running.
+ loop {
+ // Check if we still have the core. If not, the core was stolen
+ // by another worker.
+ let mut core = match self.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return Err(()),
+ };
+
+ // Check for a task in the LIFO slot
+ let task = match core.lifo_slot.take() {
+ Some(task) => task,
+ None => return Ok(core),
+ };
+
+ if coop::has_budget_remaining() {
+ // Run the LIFO task, then loop
+ core.metrics.incr_poll_count();
+ *self.core.borrow_mut() = Some(core);
+ let task = self.worker.shared.owned.assert_owner(task);
+ task.run();
+ } else {
+ // Not enough budget left to run the LIFO task, push it to
+ // the back of the queue and return.
+ core.run_queue
+ .push_back(task, self.worker.inject(), &mut core.metrics);
+ return Ok(core);
+ }
+ }
+ })
+ }
+
+ fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
+ if core.tick % GLOBAL_POLL_INTERVAL == 0 {
+ // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
+ // to run without actually putting the thread to sleep.
+ core = self.park_timeout(core, Some(Duration::from_millis(0)));
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+ }
+
+ core
+ }
+
+ fn park(&self, mut core: Box<Core>) -> Box<Core> {
+ if let Some(f) = &self.worker.shared.before_park {
+ f();
+ }
+
+ if core.transition_to_parked(&self.worker) {
+ while !core.is_shutdown {
+ core.metrics.about_to_park();
+ core = self.park_timeout(core, None);
+ core.metrics.returned_from_park();
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+
+ if core.transition_from_parked(&self.worker) {
+ break;
+ }
+ }
+ }
+
+ if let Some(f) = &self.worker.shared.after_unpark {
+ f();
+ }
+ core
+ }
+
+ fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
+ // Take the parker out of core
+ let mut park = core.park.take().expect("park missing");
+
+ // Store `core` in context
+ *self.core.borrow_mut() = Some(core);
+
+ // Park thread
+ if let Some(timeout) = duration {
+ park.park_timeout(timeout).expect("park failed");
+ } else {
+ park.park().expect("park failed");
+ }
+
+ // Remove `core` from context
+ core = self.core.borrow_mut().take().expect("core missing");
+
+ // Place `park` back in `core`
+ core.park = Some(park);
+
+ // If there are tasks available to steal, but this worker is not
+ // looking for tasks to steal, notify another worker.
+ if !core.is_searching && core.run_queue.is_stealable() {
+ self.worker.shared.notify_parked();
+ }
+
+ core
+ }
+}
+
+impl Core {
+ /// Increment the tick
+ fn tick(&mut self) {
+ self.tick = self.tick.wrapping_add(1);
+ }
+
+ /// Return the next notified task available to this worker.
+ fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
+ if self.tick % GLOBAL_POLL_INTERVAL == 0 {
+ worker.inject().pop().or_else(|| self.next_local_task())
+ } else {
+ self.next_local_task().or_else(|| worker.inject().pop())
+ }
+ }
+
+ fn next_local_task(&mut self) -> Option<Notified> {
+ self.lifo_slot.take().or_else(|| self.run_queue.pop())
+ }
+
+ fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
+ if !self.transition_to_searching(worker) {
+ return None;
+ }
+
+ let num = worker.shared.remotes.len();
+ // Start from a random worker
+ let start = self.rand.fastrand_n(num as u32) as usize;
+
+ for i in 0..num {
+ let i = (start + i) % num;
+
+ // Don't steal from ourself! We know we don't have work.
+ if i == worker.index {
+ continue;
+ }
+
+ let target = &worker.shared.remotes[i];
+ if let Some(task) = target
+ .steal
+ .steal_into(&mut self.run_queue, &mut self.metrics)
+ {
+ return Some(task);
+ }
+ }
+
+ // Fallback on checking the global queue
+ worker.shared.inject.pop()
+ }
+
+ fn transition_to_searching(&mut self, worker: &Worker) -> bool {
+ if !self.is_searching {
+ self.is_searching = worker.shared.idle.transition_worker_to_searching();
+ }
+
+ self.is_searching
+ }
+
+ fn transition_from_searching(&mut self, worker: &Worker) {
+ if !self.is_searching {
+ return;
+ }
+
+ self.is_searching = false;
+ worker.shared.transition_worker_from_searching();
+ }
+
+ /// Prepares the worker state for parking.
+ ///
+ /// Returns true if the transition happend, false if there is work to do first.
+ fn transition_to_parked(&mut self, worker: &Worker) -> bool {
+ // Workers should not park if they have work to do
+ if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
+ return false;
+ }
+
+ // When the final worker transitions **out** of searching to parked, it
+ // must check all the queues one last time in case work materialized
+ // between the last work scan and transitioning out of searching.
+ let is_last_searcher = worker
+ .shared
+ .idle
+ .transition_worker_to_parked(worker.index, self.is_searching);
+
+ // The worker is no longer searching. Setting this is the local cache
+ // only.
+ self.is_searching = false;
+
+ if is_last_searcher {
+ worker.shared.notify_if_work_pending();
+ }
+
+ true
+ }
+
+ /// Returns `true` if the transition happened.
+ fn transition_from_parked(&mut self, worker: &Worker) -> bool {
+ // If a task is in the lifo slot, then we must unpark regardless of
+ // being notified
+ if self.lifo_slot.is_some() {
+ // When a worker wakes, it should only transition to the "searching"
+ // state when the wake originates from another worker *or* a new task
+ // is pushed. We do *not* want the worker to transition to "searching"
+ // when it wakes when the I/O driver receives new events.
+ self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index);
+ return true;
+ }
+
+ if worker.shared.idle.is_parked(worker.index) {
+ return false;
+ }
+
+ // When unparked, the worker is in the searching state.
+ self.is_searching = true;
+ true
+ }
+
+ /// Runs maintenance work such as checking the pool's state.
+ fn maintenance(&mut self, worker: &Worker) {
+ self.metrics
+ .submit(&worker.shared.worker_metrics[worker.index]);
+
+ if !self.is_shutdown {
+ // Check if the scheduler has been shutdown
+ self.is_shutdown = worker.inject().is_closed();
+ }
+ }
+
+ /// Signals all tasks to shut down, and waits for them to complete. Must run
+ /// before we enter the single-threaded phase of shutdown processing.
+ fn pre_shutdown(&mut self, worker: &Worker) {
+ // Signal to all tasks to shut down.
+ worker.shared.owned.close_and_shutdown_all();
+
+ self.metrics
+ .submit(&worker.shared.worker_metrics[worker.index]);
+ }
+
+ /// Shuts down the core.
+ fn shutdown(&mut self) {
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
+
+ // Drain the queue
+ while self.next_local_task().is_some() {}
+
+ park.shutdown();
+ }
+}
+
+impl Worker {
+ /// Returns a reference to the scheduler's injection queue.
+ fn inject(&self) -> &Inject<Arc<Shared>> {
+ &self.shared.inject
+ }
+}
+
+impl task::Schedule for Arc<Shared> {
+ fn release(&self, task: &Task) -> Option<Task> {
+ self.owned.remove(task)
+ }
+
+ fn schedule(&self, task: Notified) {
+ (**self).schedule(task, false);
+ }
+
+ fn yield_now(&self, task: Notified) {
+ (**self).schedule(task, true);
+ }
+}
+
+impl Shared {
+ pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let (handle, notified) = me.owned.bind(future, me.clone());
+
+ if let Some(notified) = notified {
+ me.schedule(notified, false);
+ }
+
+ handle
+ }
+
+ pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ // Make sure the task is part of the **current** scheduler.
+ if self.ptr_eq(&cx.worker.shared) {
+ // And the current thread still holds a core
+ if let Some(core) = cx.core.borrow_mut().as_mut() {
+ self.schedule_local(core, task, is_yield);
+ return;
+ }
+ }
+ }
+
+ // Otherwise, use the inject queue.
+ self.inject.push(task);
+ self.scheduler_metrics.inc_remote_schedule_count();
+ self.notify_parked();
+ })
+ }
+
+ fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
+ core.metrics.inc_local_schedule_count();
+
+ // Spawning from the worker thread. If scheduling a "yield" then the
+ // task must always be pushed to the back of the queue, enabling other
+ // tasks to be executed. If **not** a yield, then there is more
+ // flexibility and the task may go to the front of the queue.
+ let should_notify = if is_yield {
+ core.run_queue
+ .push_back(task, &self.inject, &mut core.metrics);
+ true
+ } else {
+ // Push to the LIFO slot
+ let prev = core.lifo_slot.take();
+ let ret = prev.is_some();
+
+ if let Some(prev) = prev {
+ core.run_queue
+ .push_back(prev, &self.inject, &mut core.metrics);
+ }
+
+ core.lifo_slot = Some(task);
+
+ ret
+ };
+
+ // Only notify if not currently parked. If `park` is `None`, then the
+ // scheduling is from a resource driver. As notifications often come in
+ // batches, the notification is delayed until the park is complete.
+ if should_notify && core.park.is_some() {
+ self.notify_parked();
+ }
+ }
+
+ pub(super) fn close(&self) {
+ if self.inject.close() {
+ self.notify_all();
+ }
+ }
+
+ fn notify_parked(&self) {
+ if let Some(index) = self.idle.worker_to_notify() {
+ self.remotes[index].unpark.unpark();
+ }
+ }
+
+ fn notify_all(&self) {
+ for remote in &self.remotes[..] {
+ remote.unpark.unpark();
+ }
+ }
+
+ fn notify_if_work_pending(&self) {
+ for remote in &self.remotes[..] {
+ if !remote.steal.is_empty() {
+ self.notify_parked();
+ return;
+ }
+ }
+
+ if !self.inject.is_empty() {
+ self.notify_parked();
+ }
+ }
+
+ fn transition_worker_from_searching(&self) {
+ if self.idle.transition_worker_from_searching() {
+ // We are the final searching worker. Because work was found, we
+ // need to notify another worker.
+ self.notify_parked();
+ }
+ }
+
+ /// Signals that a worker has observed the shutdown signal and has replaced
+ /// its core back into its handle.
+ ///
+ /// If all workers have reached this point, the final cleanup is performed.
+ fn shutdown(&self, core: Box<Core>) {
+ let mut cores = self.shutdown_cores.lock();
+ cores.push(core);
+
+ if cores.len() != self.remotes.len() {
+ return;
+ }
+
+ debug_assert!(self.owned.is_empty());
+
+ for mut core in cores.drain(..) {
+ core.shutdown();
+ }
+
+ // Drain the injection queue
+ //
+ // We already shut down every task, so we can simply drop the tasks.
+ while let Some(task) = self.inject.pop() {
+ drop(task);
+ }
+ }
+
+ fn ptr_eq(&self, other: &Shared) -> bool {
+ std::ptr::eq(self, other)
+ }
+}
+
+cfg_metrics! {
+ impl Shared {
+ pub(super) fn injection_queue_depth(&self) -> usize {
+ self.inject.len()
+ }
+
+ pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize {
+ self.remotes[worker].steal.len()
+ }
+ }
+}