diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/blocking | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/blocking')
-rw-r--r-- | vendor/tokio/src/runtime/blocking/mod.rs | 36 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/blocking/pool.rs | 398 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/blocking/schedule.rs | 50 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/blocking/shutdown.rs | 8 | ||||
-rw-r--r-- | vendor/tokio/src/runtime/blocking/task.rs | 6 |
5 files changed, 382 insertions, 116 deletions
diff --git a/vendor/tokio/src/runtime/blocking/mod.rs b/vendor/tokio/src/runtime/blocking/mod.rs index fece3c279..c42924be7 100644 --- a/vendor/tokio/src/runtime/blocking/mod.rs +++ b/vendor/tokio/src/runtime/blocking/mod.rs @@ -6,37 +6,21 @@ mod pool; pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +cfg_fs! { + pub(crate) use pool::spawn_mandatory_blocking; +} + +cfg_trace! { + pub(crate) use pool::Mandatory; +} + mod schedule; mod shutdown; -pub(crate) mod task; +mod task; +pub(crate) use task::BlockingTask; use crate::runtime::Builder; pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { BlockingPool::new(builder, thread_cap) } - -/* -cfg_not_blocking_impl! { - use crate::runtime::Builder; - use std::time::Duration; - - #[derive(Debug, Clone)] - pub(crate) struct BlockingPool {} - - pub(crate) use BlockingPool as Spawner; - - pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { - BlockingPool {} - } - - impl BlockingPool { - pub(crate) fn spawner(&self) -> &BlockingPool { - self - } - - pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) { - } - } -} -*/ diff --git a/vendor/tokio/src/runtime/blocking/pool.rs b/vendor/tokio/src/runtime/blocking/pool.rs index b7d725128..a23b0a0d2 100644 --- a/vendor/tokio/src/runtime/blocking/pool.rs +++ b/vendor/tokio/src/runtime/blocking/pool.rs @@ -2,16 +2,16 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::blocking::schedule::NoopSchedule; -use crate::runtime::blocking::shutdown; +use crate::runtime::blocking::schedule::BlockingSchedule; +use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; -use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; -use crate::util::error::CONTEXT_MISSING_ERROR; use std::collections::{HashMap, VecDeque}; use std::fmt; +use std::io; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; pub(crate) struct BlockingPool { @@ -24,36 +24,84 @@ pub(crate) struct Spawner { inner: Arc<Inner>, } +#[derive(Default)] +pub(crate) struct SpawnerMetrics { + num_threads: AtomicUsize, + num_idle_threads: AtomicUsize, + queue_depth: AtomicUsize, +} + +impl SpawnerMetrics { + fn num_threads(&self) -> usize { + self.num_threads.load(Ordering::Relaxed) + } + + fn num_idle_threads(&self) -> usize { + self.num_idle_threads.load(Ordering::Relaxed) + } + + cfg_metrics! { + fn queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::Relaxed) + } + } + + fn inc_num_threads(&self) { + self.num_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_threads(&self) { + self.num_threads.fetch_sub(1, Ordering::Relaxed); + } + + fn inc_num_idle_threads(&self) { + self.num_idle_threads.fetch_add(1, Ordering::Relaxed); + } + + fn dec_num_idle_threads(&self) -> usize { + self.num_idle_threads.fetch_sub(1, Ordering::Relaxed) + } + + fn inc_queue_depth(&self) { + self.queue_depth.fetch_add(1, Ordering::Relaxed); + } + + fn dec_queue_depth(&self) { + self.queue_depth.fetch_sub(1, Ordering::Relaxed); + } +} + struct Inner { - /// State shared between worker threads + /// State shared between worker threads. shared: Mutex<Shared>, /// Pool threads wait on this. condvar: Condvar, - /// Spawned threads use this name + /// Spawned threads use this name. thread_name: ThreadNameFn, - /// Spawned thread stack size + /// Spawned thread stack size. stack_size: Option<usize>, - /// Call after a thread starts + /// Call after a thread starts. after_start: Option<Callback>, - /// Call before a thread stops + /// Call before a thread stops. before_stop: Option<Callback>, - // Maximum number of threads + // Maximum number of threads. thread_cap: usize, - // Customizable wait timeout + // Customizable wait timeout. keep_alive: Duration, + + // Metrics about the pool. + metrics: SpawnerMetrics, } struct Shared { queue: VecDeque<Task>, - num_th: usize, - num_idle: u32, num_notify: u32, shutdown: bool, shutdown_tx: Option<shutdown::Sender>, @@ -67,24 +115,93 @@ struct Shared { /// calling shutdown handles joining on these. worker_threads: HashMap<usize, thread::JoinHandle<()>>, /// This is a counter used to iterate worker_threads in a consistent order (for loom's - /// benefit) + /// benefit). worker_thread_index: usize, } -type Task = task::Notified<NoopSchedule>; +pub(crate) struct Task { + task: task::UnownedTask<BlockingSchedule>, + mandatory: Mandatory, +} + +#[derive(PartialEq, Eq)] +pub(crate) enum Mandatory { + #[cfg_attr(not(fs), allow(dead_code))] + Mandatory, + NonMandatory, +} + +pub(crate) enum SpawnError { + /// Pool is shutting down and the task was not scheduled + ShuttingDown, + /// There are no worker threads available to take the task + /// and the OS failed to spawn a new one + NoThreads(io::Error), +} + +impl From<SpawnError> for io::Error { + fn from(e: SpawnError) -> Self { + match e { + SpawnError::ShuttingDown => { + io::Error::new(io::ErrorKind::Other, "blocking pool shutting down") + } + SpawnError::NoThreads(e) => e, + } + } +} + +impl Task { + pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task { + Task { task, mandatory } + } + + fn run(self) { + self.task.run(); + } + + fn shutdown_or_run_if_mandatory(self) { + match self.mandatory { + Mandatory::NonMandatory => self.task.shutdown(), + Mandatory::Mandatory => self.task.run(), + } + } +} const KEEP_ALIVE: Duration = Duration::from_secs(10); -/// Run the provided function on an executor dedicated to blocking operations. +/// Runs the provided function on an executor dedicated to blocking operations. +/// Tasks will be scheduled as non-mandatory, meaning they may not get executed +/// in case of runtime shutdown. +#[track_caller] +#[cfg_attr(tokio_wasi, allow(dead_code))] pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let rt = context::current().expect(CONTEXT_MISSING_ERROR); + let rt = Handle::current(); rt.spawn_blocking(func) } +cfg_fs! { + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] + /// Runs the provided function on an executor dedicated to blocking + /// operations. Tasks will be scheduled as mandatory, meaning they are + /// guaranteed to run unless a shutdown is already taking place. In case a + /// shutdown is already taking place, `None` will be returned. + pub(crate) fn spawn_mandatory_blocking<F, R>(func: F) -> Option<JoinHandle<R>> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let rt = Handle::current(); + rt.inner.blocking_spawner().spawn_mandatory_blocking(&rt, func) + } +} + // ===== impl BlockingPool ===== impl BlockingPool { @@ -97,8 +214,6 @@ impl BlockingPool { inner: Arc::new(Inner { shared: Mutex::new(Shared { queue: VecDeque::new(), - num_th: 0, - num_idle: 0, num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), @@ -113,6 +228,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + metrics: Default::default(), }), }, shutdown_rx, @@ -172,53 +288,164 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { - let shutdown_tx = { - let mut shared = self.inner.shared.lock(); - - if shared.shutdown { - // Shutdown the task - task.shutdown(); - - // no need to even push this task; it would never get picked up - return Err(()); + #[track_caller] + pub(crate) fn spawn_blocking<F, R>(&self, rt: &Handle, func: F) -> JoinHandle<R> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, spawn_result) = + if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 { + self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt) + } else { + self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt) + }; + + match spawn_result { + Ok(()) => join_handle, + // Compat: do not panic here, return the join_handle even though it will never resolve + Err(SpawnError::ShuttingDown) => join_handle, + Err(SpawnError::NoThreads(e)) => { + panic!("OS can't spawn worker thread: {}", e) } + } + } - shared.queue.push_back(task); - - if shared.num_idle == 0 { - // No threads are able to process the task. - - if shared.num_th == self.inner.thread_cap { - // At max number of threads - None - } else { - shared.num_th += 1; - assert!(shared.shutdown_tx.is_some()); - shared.shutdown_tx.clone() - } + cfg_fs! { + #[track_caller] + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] + pub(crate) fn spawn_mandatory_blocking<F, R>(&self, rt: &Handle, func: F) -> Option<JoinHandle<R>> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 { + self.spawn_blocking_inner( + Box::new(func), + Mandatory::Mandatory, + None, + rt, + ) + } else { + self.spawn_blocking_inner( + func, + Mandatory::Mandatory, + None, + rt, + ) + }; + + if spawn_result.is_ok() { + Some(join_handle) } else { - // Notify an idle worker thread. The notification counter - // is used to count the needed amount of notifications - // exactly. Thread libraries may generate spurious - // wakeups, this counter is used to keep us in a - // consistent state. - shared.num_idle -= 1; - shared.num_notify += 1; - self.inner.condvar.notify_one(); None } + } + } + + #[track_caller] + pub(crate) fn spawn_blocking_inner<F, R>( + &self, + func: F, + is_mandatory: Mandatory, + name: Option<&str>, + rt: &Handle, + ) -> (JoinHandle<R>, Result<(), SpawnError>) + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let fut = BlockingTask::new(func); + let id = task::Id::next(); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + use tracing::Instrument; + let location = std::panic::Location::caller(); + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %name.unwrap_or_default(), + task.id = id.as_u64(), + "fn" = %std::any::type_name::<F>(), + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + fut.instrument(span) }; - if let Some(shutdown_tx) = shutdown_tx { - let mut shared = self.inner.shared.lock(); + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _ = name; - let id = shared.worker_thread_index; - shared.worker_thread_index += 1; + let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); - let handle = self.spawn_thread(shutdown_tx, rt, id); + let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); + (handle, spawned) + } - shared.worker_threads.insert(id, handle); + fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> { + let mut shared = self.inner.shared.lock(); + + if shared.shutdown { + // Shutdown the task: it's fine to shutdown this task (even if + // mandatory) because it was scheduled after the shutdown of the + // runtime began. + task.task.shutdown(); + + // no need to even push this task; it would never get picked up + return Err(SpawnError::ShuttingDown); + } + + shared.queue.push_back(task); + self.inner.metrics.inc_queue_depth(); + + if self.inner.metrics.num_idle_threads() == 0 { + // No threads are able to process the task. + + if self.inner.metrics.num_threads() == self.inner.thread_cap { + // At max number of threads + } else { + assert!(shared.shutdown_tx.is_some()); + let shutdown_tx = shared.shutdown_tx.clone(); + + if let Some(shutdown_tx) = shutdown_tx { + let id = shared.worker_thread_index; + + match self.spawn_thread(shutdown_tx, rt, id) { + Ok(handle) => { + self.inner.metrics.inc_num_threads(); + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + } + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + } + Err(e) => { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + return Err(SpawnError::NoThreads(e)); + } + } + } + } + } else { + // Notify an idle worker thread. The notification counter + // is used to count the needed amount of notifications + // exactly. Thread libraries may generate spurious + // wakeups, this counter is used to keep us in a + // consistent state. + self.inner.metrics.dec_num_idle_threads(); + shared.num_notify += 1; + self.inner.condvar.notify_one(); } Ok(()) @@ -229,7 +456,7 @@ impl Spawner { shutdown_tx: shutdown::Sender, rt: &Handle, id: usize, - ) -> thread::JoinHandle<()> { + ) -> std::io::Result<thread::JoinHandle<()>> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { @@ -238,17 +465,37 @@ impl Spawner { let rt = rt.clone(); - builder - .spawn(move || { - // Only the reference should be moved into the closure - let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(id); - drop(shutdown_tx); - }) - .unwrap() + builder.spawn(move || { + // Only the reference should be moved into the closure + let _enter = rt.enter(); + rt.inner.blocking_spawner().inner.run(id); + drop(shutdown_tx); + }) + } +} + +cfg_metrics! { + impl Spawner { + pub(crate) fn num_threads(&self) -> usize { + self.inner.metrics.num_threads() + } + + pub(crate) fn num_idle_threads(&self) -> usize { + self.inner.metrics.num_idle_threads() + } + + pub(crate) fn queue_depth(&self) -> usize { + self.inner.metrics.queue_depth() + } } } +// Tells whether the error when spawning a thread is temporary. +#[inline] +fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { + matches!(error.kind(), std::io::ErrorKind::WouldBlock) +} + impl Inner { fn run(&self, worker_thread_id: usize) { if let Some(f) = &self.after_start { @@ -261,6 +508,7 @@ impl Inner { 'main: loop { // BUSY while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); task.run(); @@ -268,7 +516,7 @@ impl Inner { } // IDLE - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); while !shared.shutdown { let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); @@ -302,8 +550,10 @@ impl Inner { if shared.shutdown { // Drain the queue while let Some(task) = shared.queue.pop_front() { + self.metrics.dec_queue_depth(); drop(shared); - task.shutdown(); + + task.shutdown_or_run_if_mandatory(); shared = self.shared.lock(); } @@ -311,7 +561,7 @@ impl Inner { // Work was produced, and we "took" it (by decrementing num_notify). // This means that num_idle was decremented once for our wakeup. // But, since we are exiting, we need to "undo" that, as we'll stay idle. - shared.num_idle += 1; + self.metrics.inc_num_idle_threads(); // NOTE: Technically we should also do num_notify++ and notify again, // but since we're shutting down anyway, that won't be necessary. break; @@ -319,17 +569,17 @@ impl Inner { } // Thread exit - shared.num_th -= 1; + self.metrics.dec_num_threads(); // num_idle should now be tracked exactly, panic // with a descriptive message if it is not the // case. - shared.num_idle = shared - .num_idle - .checked_sub(1) - .expect("num_idle underflowed on thread exit"); + let prev_idle = self.metrics.dec_num_idle_threads(); + if prev_idle < self.metrics.num_idle_threads() { + panic!("num_idle_threads underflowed on thread exit") + } - if shared.shutdown && shared.num_th == 0 { + if shared.shutdown && self.metrics.num_threads() == 0 { self.condvar.notify_one(); } diff --git a/vendor/tokio/src/runtime/blocking/schedule.rs b/vendor/tokio/src/runtime/blocking/schedule.rs index 4e044ab29..edf775be8 100644 --- a/vendor/tokio/src/runtime/blocking/schedule.rs +++ b/vendor/tokio/src/runtime/blocking/schedule.rs @@ -1,20 +1,52 @@ +#[cfg(feature = "test-util")] +use crate::runtime::scheduler; use crate::runtime::task::{self, Task}; +use crate::runtime::Handle; -/// `task::Schedule` implementation that does nothing. This is unique to the -/// blocking scheduler as tasks scheduled are not really futures but blocking -/// operations. +/// `task::Schedule` implementation that does nothing (except some bookkeeping +/// in test-util builds). This is unique to the blocking scheduler as tasks +/// scheduled are not really futures but blocking operations. /// /// We avoid storing the task by forgetting it in `bind` and re-materializing it -/// in `release. -pub(crate) struct NoopSchedule; +/// in `release`. +pub(crate) struct BlockingSchedule { + #[cfg(feature = "test-util")] + handle: Handle, +} -impl task::Schedule for NoopSchedule { - fn bind(_task: Task<Self>) -> NoopSchedule { - // Do nothing w/ the task - NoopSchedule +impl BlockingSchedule { + #[cfg_attr(not(feature = "test-util"), allow(unused_variables))] + pub(crate) fn new(handle: &Handle) -> Self { + #[cfg(feature = "test-util")] + { + match &handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.inhibit_auto_advance(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } + BlockingSchedule { + #[cfg(feature = "test-util")] + handle: handle.clone(), + } } +} +impl task::Schedule for BlockingSchedule { fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + #[cfg(feature = "test-util")] + { + match &self.handle.inner { + scheduler::Handle::CurrentThread(handle) => { + handle.driver.clock.allow_auto_advance(); + handle.driver.unpark(); + } + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + scheduler::Handle::MultiThread(_) => {} + } + } None } diff --git a/vendor/tokio/src/runtime/blocking/shutdown.rs b/vendor/tokio/src/runtime/blocking/shutdown.rs index 0cf22859b..fe5abae07 100644 --- a/vendor/tokio/src/runtime/blocking/shutdown.rs +++ b/vendor/tokio/src/runtime/blocking/shutdown.rs @@ -10,7 +10,7 @@ use std::time::Duration; #[derive(Debug, Clone)] pub(super) struct Sender { - tx: Arc<oneshot::Sender<()>>, + _tx: Arc<oneshot::Sender<()>>, } #[derive(Debug)] @@ -20,7 +20,7 @@ pub(super) struct Receiver { pub(super) fn channel() -> (Sender, Receiver) { let (tx, rx) = oneshot::channel(); - let tx = Sender { tx: Arc::new(tx) }; + let tx = Sender { _tx: Arc::new(tx) }; let rx = Receiver { rx }; (tx, rx) @@ -35,13 +35,13 @@ impl Receiver { /// /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`. pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool { - use crate::runtime::enter::try_enter; + use crate::runtime::context::try_enter_blocking_region; if timeout == Some(Duration::from_nanos(0)) { return false; } - let mut e = match try_enter(false) { + let mut e = match try_enter_blocking_region() { Some(enter) => enter, _ => { if std::thread::panicking() { diff --git a/vendor/tokio/src/runtime/blocking/task.rs b/vendor/tokio/src/runtime/blocking/task.rs index ee2d8d6d6..c44617540 100644 --- a/vendor/tokio/src/runtime/blocking/task.rs +++ b/vendor/tokio/src/runtime/blocking/task.rs @@ -2,13 +2,13 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Converts a function to a future that completes on poll +/// Converts a function to a future that completes on poll. pub(crate) struct BlockingTask<T> { func: Option<T>, } impl<T> BlockingTask<T> { - /// Initializes a new blocking task from the given function + /// Initializes a new blocking task from the given function. pub(crate) fn new(func: T) -> BlockingTask<T> { BlockingTask { func: Some(func) } } @@ -37,7 +37,7 @@ where // currently goes through Task::poll(), and so is subject to budgeting. That isn't really // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so // we want it to start without any budgeting. - crate::coop::stop(); + crate::runtime::coop::stop(); Poll::Ready(func()) } |