diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/tokio/src/runtime/thread_pool | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esrupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/runtime/thread_pool')
-rw-r--r-- | third_party/rust/tokio/src/runtime/thread_pool/idle.rs | 226 | ||||
-rw-r--r-- | third_party/rust/tokio/src/runtime/thread_pool/mod.rs | 136 | ||||
-rw-r--r-- | third_party/rust/tokio/src/runtime/thread_pool/worker.rs | 848 |
3 files changed, 1210 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/idle.rs b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs new file mode 100644 index 0000000000..a57bf6a0b1 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs @@ -0,0 +1,226 @@ +//! Coordinates idling workers + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; + +use std::fmt; +use std::sync::atomic::Ordering::{self, SeqCst}; + +pub(super) struct Idle { + /// Tracks both the number of searching workers and the number of unparked + /// workers. + /// + /// Used as a fast-path to avoid acquiring the lock when needed. + state: AtomicUsize, + + /// Sleeping workers + sleepers: Mutex<Vec<usize>>, + + /// Total number of workers. + num_workers: usize, +} + +const UNPARK_SHIFT: usize = 16; +const UNPARK_MASK: usize = !SEARCH_MASK; +const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1; + +#[derive(Copy, Clone)] +struct State(usize); + +impl Idle { + pub(super) fn new(num_workers: usize) -> Idle { + let init = State::new(num_workers); + + Idle { + state: AtomicUsize::new(init.into()), + sleepers: Mutex::new(Vec::with_capacity(num_workers)), + num_workers, + } + } + + /// If there are no workers actively searching, returns the index of a + /// worker currently sleeping. + pub(super) fn worker_to_notify(&self) -> Option<usize> { + // If at least one worker is spinning, work being notified will + // eventually be found. A searching thread will find **some** work and + // notify another worker, eventually leading to our work being found. + // + // For this to happen, this load must happen before the thread + // transitioning `num_searching` to zero. Acquire / Release does not + // provide sufficient guarantees, so this load is done with `SeqCst` and + // will pair with the `fetch_sub(1)` when transitioning out of + // searching. + if !self.notify_should_wakeup() { + return None; + } + + // Acquire the lock + let mut sleepers = self.sleepers.lock(); + + // Check again, now that the lock is acquired + if !self.notify_should_wakeup() { + return None; + } + + // A worker should be woken up, atomically increment the number of + // searching workers as well as the number of unparked workers. + State::unpark_one(&self.state, 1); + + // Get the worker to unpark + let ret = sleepers.pop(); + debug_assert!(ret.is_some()); + + ret + } + + /// Returns `true` if the worker needs to do a final check for submitted + /// work. + pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { + // Acquire the lock + let mut sleepers = self.sleepers.lock(); + + // Decrement the number of unparked threads + let ret = State::dec_num_unparked(&self.state, is_searching); + + // Track the sleeping worker + sleepers.push(worker); + + ret + } + + pub(super) fn transition_worker_to_searching(&self) -> bool { + let state = State::load(&self.state, SeqCst); + if 2 * state.num_searching() >= self.num_workers { + return false; + } + + // It is possible for this routine to allow more than 50% of the workers + // to search. That is OK. Limiting searchers is only an optimization to + // prevent too much contention. + State::inc_num_searching(&self.state, SeqCst); + true + } + + /// A lightweight transition from searching -> running. + /// + /// Returns `true` if this is the final searching worker. The caller + /// **must** notify a new worker. + pub(super) fn transition_worker_from_searching(&self) -> bool { + State::dec_num_searching(&self.state) + } + + /// Unpark a specific worker. This happens if tasks are submitted from + /// within the worker's park routine. + /// + /// Returns `true` if the worker was parked before calling the method. + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) -> bool { + let mut sleepers = self.sleepers.lock(); + + for index in 0..sleepers.len() { + if sleepers[index] == worker_id { + sleepers.swap_remove(index); + + // Update the state accordingly while the lock is held. + State::unpark_one(&self.state, 0); + + return true; + } + } + + false + } + + /// Returns `true` if `worker_id` is contained in the sleep set. + pub(super) fn is_parked(&self, worker_id: usize) -> bool { + let sleepers = self.sleepers.lock(); + sleepers.contains(&worker_id) + } + + fn notify_should_wakeup(&self) -> bool { + let state = State(self.state.fetch_add(0, SeqCst)); + state.num_searching() == 0 && state.num_unparked() < self.num_workers + } +} + +impl State { + fn new(num_workers: usize) -> State { + // All workers start in the unparked state + let ret = State(num_workers << UNPARK_SHIFT); + debug_assert_eq!(num_workers, ret.num_unparked()); + debug_assert_eq!(0, ret.num_searching()); + ret + } + + fn load(cell: &AtomicUsize, ordering: Ordering) -> State { + State(cell.load(ordering)) + } + + fn unpark_one(cell: &AtomicUsize, num_searching: usize) { + cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst); + } + + fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { + cell.fetch_add(1, ordering); + } + + /// Returns `true` if this is the final searching worker + fn dec_num_searching(cell: &AtomicUsize) -> bool { + let state = State(cell.fetch_sub(1, SeqCst)); + state.num_searching() == 1 + } + + /// Track a sleeping worker + /// + /// Returns `true` if this is the final searching worker. + fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool { + let mut dec = 1 << UNPARK_SHIFT; + + if is_searching { + dec += 1; + } + + let prev = State(cell.fetch_sub(dec, SeqCst)); + is_searching && prev.num_searching() == 1 + } + + /// Number of workers currently searching + fn num_searching(self) -> usize { + self.0 & SEARCH_MASK + } + + /// Number of workers currently unparked + fn num_unparked(self) -> usize { + (self.0 & UNPARK_MASK) >> UNPARK_SHIFT + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("worker::State") + .field("num_unparked", &self.num_unparked()) + .field("num_searching", &self.num_searching()) + .finish() + } +} + +#[test] +fn test_state() { + assert_eq!(0, UNPARK_MASK & SEARCH_MASK); + assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK)); + + let state = State::new(10); + assert_eq!(10, state.num_unparked()); + assert_eq!(0, state.num_searching()); +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/mod.rs b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs new file mode 100644 index 0000000000..d3f46517cb --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs @@ -0,0 +1,136 @@ +//! Threadpool + +mod idle; +use self::idle::Idle; + +mod worker; +pub(crate) use worker::Launch; + +pub(crate) use worker::block_in_place; + +use crate::loom::sync::Arc; +use crate::runtime::task::JoinHandle; +use crate::runtime::{Callback, Parker}; + +use std::fmt; +use std::future::Future; + +/// Work-stealing based thread pool for executing futures. +pub(crate) struct ThreadPool { + spawner: Spawner, +} + +/// Submits futures to the associated thread pool for execution. +/// +/// A `Spawner` instance is a handle to a single thread pool that allows the owner +/// of the handle to spawn futures onto the thread pool. +/// +/// The `Spawner` handle is *only* used for spawning new futures. It does not +/// impact the lifecycle of the thread pool in any way. The thread pool may +/// shut down while there are outstanding `Spawner` instances. +/// +/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`]. +/// +/// [`ThreadPool::spawner`]: method@ThreadPool::spawner +#[derive(Clone)] +pub(crate) struct Spawner { + shared: Arc<worker::Shared>, +} + +// ===== impl ThreadPool ===== + +impl ThreadPool { + pub(crate) fn new( + size: usize, + parker: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, + ) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, parker, before_park, after_unpark); + let spawner = Spawner { shared }; + let thread_pool = ThreadPool { spawner }; + + (thread_pool, launch) + } + + /// Returns reference to `Spawner`. + /// + /// The `Spawner` handle can be cloned and enables spawning tasks from other + /// threads. + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Blocks the current thread waiting for the future to complete. + /// + /// The future will execute on the current thread, but all spawned tasks + /// will be executed on the thread pool. + pub(crate) fn block_on<F>(&self, future: F) -> F::Output + where + F: Future, + { + let mut enter = crate::runtime::enter(true); + enter.block_on(future).expect("failed to park thread") + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool").finish() + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.spawner.shutdown(); + } +} + +// ==== impl Spawner ===== + +impl Spawner { + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: crate::future::Future + Send + 'static, + F::Output: Send + 'static, + { + worker::Shared::bind_new_task(&self.shared, future) + } + + pub(crate) fn shutdown(&mut self) { + self.shared.close(); + } +} + +cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Spawner { + pub(crate) fn num_workers(&self) -> usize { + self.shared.worker_metrics.len() + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + self.shared.injection_queue_depth() + } + + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.shared.worker_local_queue_depth(worker) + } + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/worker.rs b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs new file mode 100644 index 0000000000..7e4989701e --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs @@ -0,0 +1,848 @@ +//! A scheduler is initialized with a fixed number of workers. Each worker is +//! driven by a thread. Each worker has a "core" which contains data such as the +//! run queue and other state. When `block_in_place` is called, the worker's +//! "core" is handed off to a new thread allowing the scheduler to continue to +//! make progress while the originating thread blocks. +//! +//! # Shutdown +//! +//! Shutting down the runtime involves the following steps: +//! +//! 1. The Shared::close method is called. This closes the inject queue and +//! OwnedTasks instance and wakes up all worker threads. +//! +//! 2. Each worker thread observes the close signal next time it runs +//! Core::maintenance by checking whether the inject queue is closed. +//! The Core::is_shutdown flag is set to true. +//! +//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker +//! will keep removing tasks from OwnedTasks until it is empty. No new +//! tasks can be pushed to the OwnedTasks during or after this step as it +//! was closed in step 1. +//! +//! 5. The workers call Shared::shutdown to enter the single-threaded phase of +//! shutdown. These calls will push their core to Shared::shutdown_cores, +//! and the last thread to push its core will finish the shutdown procedure. +//! +//! 6. The local run queue of each core is emptied, then the inject queue is +//! emptied. +//! +//! At this point, shutdown has completed. It is not possible for any of the +//! collections to contain any tasks at this point, as each collection was +//! closed first, then emptied afterwards. +//! +//! ## Spawns during shutdown +//! +//! When spawning tasks during shutdown, there are two cases: +//! +//! * The spawner observes the OwnedTasks being open, and the inject queue is +//! closed. +//! * The spawner observes the OwnedTasks being closed and doesn't check the +//! inject queue. +//! +//! The first case can only happen if the OwnedTasks::bind call happens before +//! or during step 1 of shutdown. In this case, the runtime will clean up the +//! task in step 3 of shutdown. +//! +//! In the latter case, the task was not spawned and the task is immediately +//! cancelled by the spawner. +//! +//! The correctness of shutdown requires both the inject queue and OwnedTasks +//! collection to have a closed bit. With a close bit on only the inject queue, +//! spawning could run in to a situation where a task is successfully bound long +//! after the runtime has shut down. With a close bit on only the OwnedTasks, +//! the first spawning situation could result in the notification being pushed +//! to the inject queue after step 6 of shutdown, which would leave a task in +//! the inject queue indefinitely. This would be a ref-count cycle and a memory +//! leak. + +use crate::coop; +use crate::future::Future; +use crate::loom::rand::seed; +use crate::loom::sync::{Arc, Mutex}; +use crate::park::{Park, Unpark}; +use crate::runtime; +use crate::runtime::enter::EnterContext; +use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; +use crate::runtime::thread_pool::Idle; +use crate::runtime::{queue, task, Callback, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::util::atomic_cell::AtomicCell; +use crate::util::FastRand; + +use std::cell::RefCell; +use std::time::Duration; + +/// A scheduler worker +pub(super) struct Worker { + /// Reference to shared state + shared: Arc<Shared>, + + /// Index holding this worker's remote state + index: usize, + + /// Used to hand-off a worker's core to another thread. + core: AtomicCell<Core>, +} + +/// Core data +struct Core { + /// Used to schedule bookkeeping tasks every so often. + tick: u8, + + /// When a task is scheduled from a worker, it is stored in this slot. The + /// worker will check this slot for a task **before** checking the run + /// queue. This effectively results in the **last** scheduled task to be run + /// next (LIFO). This is an optimization for message passing patterns and + /// helps to reduce latency. + lifo_slot: Option<Notified>, + + /// The worker-local run queue. + run_queue: queue::Local<Arc<Shared>>, + + /// True if the worker is currently searching for more work. Searching + /// involves attempting to steal from other workers. + is_searching: bool, + + /// True if the scheduler is being shutdown + is_shutdown: bool, + + /// Parker + /// + /// Stored in an `Option` as the parker is added / removed to make the + /// borrow checker happy. + park: Option<Parker>, + + /// Batching metrics so they can be submitted to RuntimeMetrics. + metrics: MetricsBatch, + + /// Fast random number generator. + rand: FastRand, +} + +/// State shared across all workers +pub(super) struct Shared { + /// Per-worker remote state. All other workers have access to this and is + /// how they communicate between each other. + remotes: Box<[Remote]>, + + /// Submits work to the scheduler while **not** currently on a worker thread. + inject: Inject<Arc<Shared>>, + + /// Coordinates idle workers + idle: Idle, + + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks<Arc<Shared>>, + + /// Cores that have observed the shutdown signal + /// + /// The core is **not** placed back in the worker to avoid it from being + /// stolen by a thread that was spawned as part of `block_in_place`. + #[allow(clippy::vec_box)] // we're moving an already-boxed value + shutdown_cores: Mutex<Vec<Box<Core>>>, + + /// Callback for a worker parking itself + before_park: Option<Callback>, + /// Callback for a worker unparking itself + after_unpark: Option<Callback>, + + /// Collects metrics from the runtime. + pub(super) scheduler_metrics: SchedulerMetrics, + + pub(super) worker_metrics: Box<[WorkerMetrics]>, +} + +/// Used to communicate with a worker from other threads. +struct Remote { + /// Steals tasks from this worker. + steal: queue::Steal<Arc<Shared>>, + + /// Unparks the associated worker thread + unpark: Unparker, +} + +/// Thread-local context +struct Context { + /// Worker + worker: Arc<Worker>, + + /// Core data + core: RefCell<Option<Box<Core>>>, +} + +/// Starts the workers +pub(crate) struct Launch(Vec<Arc<Worker>>); + +/// Running a task may consume the core. If the core is still available when +/// running the task completes, it is returned. Otherwise, the worker will need +/// to stop processing. +type RunResult = Result<Box<Core>, ()>; + +/// A task handle +type Task = task::Task<Arc<Shared>>; + +/// A notified task handle +type Notified = task::Notified<Arc<Shared>>; + +// Tracks thread-local state +scoped_thread_local!(static CURRENT: Context); + +pub(super) fn create( + size: usize, + park: Parker, + before_park: Option<Callback>, + after_unpark: Option<Callback>, +) -> (Arc<Shared>, Launch) { + let mut cores = vec![]; + let mut remotes = vec![]; + let mut worker_metrics = vec![]; + + // Create the local queues + for _ in 0..size { + let (steal, run_queue) = queue::local(); + + let park = park.clone(); + let unpark = park.unpark(); + + cores.push(Box::new(Core { + tick: 0, + lifo_slot: None, + run_queue, + is_searching: false, + is_shutdown: false, + park: Some(park), + metrics: MetricsBatch::new(), + rand: FastRand::new(seed()), + })); + + remotes.push(Remote { steal, unpark }); + worker_metrics.push(WorkerMetrics::new()); + } + + let shared = Arc::new(Shared { + remotes: remotes.into_boxed_slice(), + inject: Inject::new(), + idle: Idle::new(size), + owned: OwnedTasks::new(), + shutdown_cores: Mutex::new(vec![]), + before_park, + after_unpark, + scheduler_metrics: SchedulerMetrics::new(), + worker_metrics: worker_metrics.into_boxed_slice(), + }); + + let mut launch = Launch(vec![]); + + for (index, core) in cores.drain(..).enumerate() { + launch.0.push(Arc::new(Worker { + shared: shared.clone(), + index, + core: AtomicCell::new(Some(core)), + })); + } + + (shared, launch) +} + +pub(crate) fn block_in_place<F, R>(f: F) -> R +where + F: FnOnce() -> R, +{ + // Try to steal the worker core back + struct Reset(coop::Budget); + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + let core = cx.worker.core.take(); + let mut cx_core = cx.core.borrow_mut(); + assert!(cx_core.is_none()); + *cx_core = core; + + // Reset the task budget as we are re-entering the + // runtime. + coop::set(self.0); + } + }); + } + } + + let mut had_entered = false; + + CURRENT.with(|maybe_cx| { + match (crate::runtime::enter::context(), maybe_cx.is_some()) { + (EnterContext::Entered { .. }, true) => { + // We are on a thread pool runtime thread, so we just need to + // set up blocking. + had_entered = true; + } + (EnterContext::Entered { allow_blocking }, false) => { + // We are on an executor, but _not_ on the thread pool. That is + // _only_ okay if we are in a thread pool runtime's block_on + // method: + if allow_blocking { + had_entered = true; + return; + } else { + // This probably means we are on the basic_scheduler or in a + // LocalSet, where it is _not_ okay to block. + panic!("can call blocking only when running on the multi-threaded runtime"); + } + } + (EnterContext::NotEntered, true) => { + // This is a nested call to block_in_place (we already exited). + // All the necessary setup has already been done. + return; + } + (EnterContext::NotEntered, false) => { + // We are outside of the tokio runtime, so blocking is fine. + // We can also skip all of the thread pool blocking setup steps. + return; + } + } + + let cx = maybe_cx.expect("no .is_some() == false cases above should lead here"); + + // Get the worker core. If none is set, then blocking is fine! + let core = match cx.core.borrow_mut().take() { + Some(core) => core, + None => return, + }; + + // The parker should be set here + assert!(core.park.is_some()); + + // In order to block, the core must be sent to another thread for + // execution. + // + // First, move the core back into the worker's shared core slot. + cx.worker.core.set(core); + + // Next, clone the worker handle and send it to a new thread for + // processing. + // + // Once the blocking task is done executing, we will attempt to + // steal the core back. + let worker = cx.worker.clone(); + runtime::spawn_blocking(move || run(worker)); + }); + + if had_entered { + // Unset the current task's budget. Blocking sections are not + // constrained by task budgets. + let _reset = Reset(coop::stop()); + + crate::runtime::enter::exit(f) + } else { + f() + } +} + +/// After how many ticks is the global queue polled. This helps to ensure +/// fairness. +/// +/// The number is fairly arbitrary. I believe this value was copied from golang. +const GLOBAL_POLL_INTERVAL: u8 = 61; + +impl Launch { + pub(crate) fn launch(mut self) { + for worker in self.0.drain(..) { + runtime::spawn_blocking(move || run(worker)); + } + } +} + +fn run(worker: Arc<Worker>) { + // Acquire a core. If this fails, then another thread is running this + // worker and there is nothing further to do. + let core = match worker.core.take() { + Some(core) => core, + None => return, + }; + + // Set the worker context. + let cx = Context { + worker, + core: RefCell::new(None), + }; + + let _enter = crate::runtime::enter(true); + + CURRENT.set(&cx, || { + // This should always be an error. It only returns a `Result` to support + // using `?` to short circuit. + assert!(cx.run(core).is_err()); + }); +} + +impl Context { + fn run(&self, mut core: Box<Core>) -> RunResult { + while !core.is_shutdown { + // Increment the tick + core.tick(); + + // Run maintenance, if needed + core = self.maintenance(core); + + // First, check work available to the current worker. + if let Some(task) = core.next_task(&self.worker) { + core = self.run_task(task, core)?; + continue; + } + + // There is no more **local** work to process, try to steal work + // from other workers. + if let Some(task) = core.steal_work(&self.worker) { + core = self.run_task(task, core)?; + } else { + // Wait for work + core = self.park(core); + } + } + + core.pre_shutdown(&self.worker); + + // Signal shutdown + self.worker.shared.shutdown(core); + Err(()) + } + + fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { + let task = self.worker.shared.owned.assert_owner(task); + + // Make sure the worker is not in the **searching** state. This enables + // another idle worker to try to steal work. + core.transition_from_searching(&self.worker); + + // Make the core available to the runtime context + core.metrics.incr_poll_count(); + *self.core.borrow_mut() = Some(core); + + // Run the task + coop::budget(|| { + task.run(); + + // As long as there is budget remaining and a task exists in the + // `lifo_slot`, then keep running. + loop { + // Check if we still have the core. If not, the core was stolen + // by another worker. + let mut core = match self.core.borrow_mut().take() { + Some(core) => core, + None => return Err(()), + }; + + // Check for a task in the LIFO slot + let task = match core.lifo_slot.take() { + Some(task) => task, + None => return Ok(core), + }; + + if coop::has_budget_remaining() { + // Run the LIFO task, then loop + core.metrics.incr_poll_count(); + *self.core.borrow_mut() = Some(core); + let task = self.worker.shared.owned.assert_owner(task); + task.run(); + } else { + // Not enough budget left to run the LIFO task, push it to + // the back of the queue and return. + core.run_queue + .push_back(task, self.worker.inject(), &mut core.metrics); + return Ok(core); + } + } + }) + } + + fn maintenance(&self, mut core: Box<Core>) -> Box<Core> { + if core.tick % GLOBAL_POLL_INTERVAL == 0 { + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... + // to run without actually putting the thread to sleep. + core = self.park_timeout(core, Some(Duration::from_millis(0))); + + // Run regularly scheduled maintenance + core.maintenance(&self.worker); + } + + core + } + + fn park(&self, mut core: Box<Core>) -> Box<Core> { + if let Some(f) = &self.worker.shared.before_park { + f(); + } + + if core.transition_to_parked(&self.worker) { + while !core.is_shutdown { + core.metrics.about_to_park(); + core = self.park_timeout(core, None); + core.metrics.returned_from_park(); + + // Run regularly scheduled maintenance + core.maintenance(&self.worker); + + if core.transition_from_parked(&self.worker) { + break; + } + } + } + + if let Some(f) = &self.worker.shared.after_unpark { + f(); + } + core + } + + fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> { + // Take the parker out of core + let mut park = core.park.take().expect("park missing"); + + // Store `core` in context + *self.core.borrow_mut() = Some(core); + + // Park thread + if let Some(timeout) = duration { + park.park_timeout(timeout).expect("park failed"); + } else { + park.park().expect("park failed"); + } + + // Remove `core` from context + core = self.core.borrow_mut().take().expect("core missing"); + + // Place `park` back in `core` + core.park = Some(park); + + // If there are tasks available to steal, but this worker is not + // looking for tasks to steal, notify another worker. + if !core.is_searching && core.run_queue.is_stealable() { + self.worker.shared.notify_parked(); + } + + core + } +} + +impl Core { + /// Increment the tick + fn tick(&mut self) { + self.tick = self.tick.wrapping_add(1); + } + + /// Return the next notified task available to this worker. + fn next_task(&mut self, worker: &Worker) -> Option<Notified> { + if self.tick % GLOBAL_POLL_INTERVAL == 0 { + worker.inject().pop().or_else(|| self.next_local_task()) + } else { + self.next_local_task().or_else(|| worker.inject().pop()) + } + } + + fn next_local_task(&mut self) -> Option<Notified> { + self.lifo_slot.take().or_else(|| self.run_queue.pop()) + } + + fn steal_work(&mut self, worker: &Worker) -> Option<Notified> { + if !self.transition_to_searching(worker) { + return None; + } + + let num = worker.shared.remotes.len(); + // Start from a random worker + let start = self.rand.fastrand_n(num as u32) as usize; + + for i in 0..num { + let i = (start + i) % num; + + // Don't steal from ourself! We know we don't have work. + if i == worker.index { + continue; + } + + let target = &worker.shared.remotes[i]; + if let Some(task) = target + .steal + .steal_into(&mut self.run_queue, &mut self.metrics) + { + return Some(task); + } + } + + // Fallback on checking the global queue + worker.shared.inject.pop() + } + + fn transition_to_searching(&mut self, worker: &Worker) -> bool { + if !self.is_searching { + self.is_searching = worker.shared.idle.transition_worker_to_searching(); + } + + self.is_searching + } + + fn transition_from_searching(&mut self, worker: &Worker) { + if !self.is_searching { + return; + } + + self.is_searching = false; + worker.shared.transition_worker_from_searching(); + } + + /// Prepares the worker state for parking. + /// + /// Returns true if the transition happend, false if there is work to do first. + fn transition_to_parked(&mut self, worker: &Worker) -> bool { + // Workers should not park if they have work to do + if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + return false; + } + + // When the final worker transitions **out** of searching to parked, it + // must check all the queues one last time in case work materialized + // between the last work scan and transitioning out of searching. + let is_last_searcher = worker + .shared + .idle + .transition_worker_to_parked(worker.index, self.is_searching); + + // The worker is no longer searching. Setting this is the local cache + // only. + self.is_searching = false; + + if is_last_searcher { + worker.shared.notify_if_work_pending(); + } + + true + } + + /// Returns `true` if the transition happened. + fn transition_from_parked(&mut self, worker: &Worker) -> bool { + // If a task is in the lifo slot, then we must unpark regardless of + // being notified + if self.lifo_slot.is_some() { + // When a worker wakes, it should only transition to the "searching" + // state when the wake originates from another worker *or* a new task + // is pushed. We do *not* want the worker to transition to "searching" + // when it wakes when the I/O driver receives new events. + self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index); + return true; + } + + if worker.shared.idle.is_parked(worker.index) { + return false; + } + + // When unparked, the worker is in the searching state. + self.is_searching = true; + true + } + + /// Runs maintenance work such as checking the pool's state. + fn maintenance(&mut self, worker: &Worker) { + self.metrics + .submit(&worker.shared.worker_metrics[worker.index]); + + if !self.is_shutdown { + // Check if the scheduler has been shutdown + self.is_shutdown = worker.inject().is_closed(); + } + } + + /// Signals all tasks to shut down, and waits for them to complete. Must run + /// before we enter the single-threaded phase of shutdown processing. + fn pre_shutdown(&mut self, worker: &Worker) { + // Signal to all tasks to shut down. + worker.shared.owned.close_and_shutdown_all(); + + self.metrics + .submit(&worker.shared.worker_metrics[worker.index]); + } + + /// Shuts down the core. + fn shutdown(&mut self) { + // Take the core + let mut park = self.park.take().expect("park missing"); + + // Drain the queue + while self.next_local_task().is_some() {} + + park.shutdown(); + } +} + +impl Worker { + /// Returns a reference to the scheduler's injection queue. + fn inject(&self) -> &Inject<Arc<Shared>> { + &self.shared.inject + } +} + +impl task::Schedule for Arc<Shared> { + fn release(&self, task: &Task) -> Option<Task> { + self.owned.remove(task) + } + + fn schedule(&self, task: Notified) { + (**self).schedule(task, false); + } + + fn yield_now(&self, task: Notified) { + (**self).schedule(task, true); + } +} + +impl Shared { + pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output> + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let (handle, notified) = me.owned.bind(future, me.clone()); + + if let Some(notified) = notified { + me.schedule(notified, false); + } + + handle + } + + pub(super) fn schedule(&self, task: Notified, is_yield: bool) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + // Make sure the task is part of the **current** scheduler. + if self.ptr_eq(&cx.worker.shared) { + // And the current thread still holds a core + if let Some(core) = cx.core.borrow_mut().as_mut() { + self.schedule_local(core, task, is_yield); + return; + } + } + } + + // Otherwise, use the inject queue. + self.inject.push(task); + self.scheduler_metrics.inc_remote_schedule_count(); + self.notify_parked(); + }) + } + + fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { + core.metrics.inc_local_schedule_count(); + + // Spawning from the worker thread. If scheduling a "yield" then the + // task must always be pushed to the back of the queue, enabling other + // tasks to be executed. If **not** a yield, then there is more + // flexibility and the task may go to the front of the queue. + let should_notify = if is_yield { + core.run_queue + .push_back(task, &self.inject, &mut core.metrics); + true + } else { + // Push to the LIFO slot + let prev = core.lifo_slot.take(); + let ret = prev.is_some(); + + if let Some(prev) = prev { + core.run_queue + .push_back(prev, &self.inject, &mut core.metrics); + } + + core.lifo_slot = Some(task); + + ret + }; + + // Only notify if not currently parked. If `park` is `None`, then the + // scheduling is from a resource driver. As notifications often come in + // batches, the notification is delayed until the park is complete. + if should_notify && core.park.is_some() { + self.notify_parked(); + } + } + + pub(super) fn close(&self) { + if self.inject.close() { + self.notify_all(); + } + } + + fn notify_parked(&self) { + if let Some(index) = self.idle.worker_to_notify() { + self.remotes[index].unpark.unpark(); + } + } + + fn notify_all(&self) { + for remote in &self.remotes[..] { + remote.unpark.unpark(); + } + } + + fn notify_if_work_pending(&self) { + for remote in &self.remotes[..] { + if !remote.steal.is_empty() { + self.notify_parked(); + return; + } + } + + if !self.inject.is_empty() { + self.notify_parked(); + } + } + + fn transition_worker_from_searching(&self) { + if self.idle.transition_worker_from_searching() { + // We are the final searching worker. Because work was found, we + // need to notify another worker. + self.notify_parked(); + } + } + + /// Signals that a worker has observed the shutdown signal and has replaced + /// its core back into its handle. + /// + /// If all workers have reached this point, the final cleanup is performed. + fn shutdown(&self, core: Box<Core>) { + let mut cores = self.shutdown_cores.lock(); + cores.push(core); + + if cores.len() != self.remotes.len() { + return; + } + + debug_assert!(self.owned.is_empty()); + + for mut core in cores.drain(..) { + core.shutdown(); + } + + // Drain the injection queue + // + // We already shut down every task, so we can simply drop the tasks. + while let Some(task) = self.inject.pop() { + drop(task); + } + } + + fn ptr_eq(&self, other: &Shared) -> bool { + std::ptr::eq(self, other) + } +} + +cfg_metrics! { + impl Shared { + pub(super) fn injection_queue_depth(&self) -> usize { + self.inject.len() + } + + pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.remotes[worker].steal.len() + } + } +} |