diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/blocking/pool.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/blocking/pool.rs | 396 |
1 files changed, 396 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/blocking/pool.rs b/third_party/rust/tokio/src/runtime/blocking/pool.rs new file mode 100644 index 0000000000..daf1f63fac --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/pool.rs @@ -0,0 +1,396 @@ +//! Thread pool for blocking operations + +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::thread; +use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::shutdown; +use crate::runtime::builder::ThreadNameFn; +use crate::runtime::context; +use crate::runtime::task::{self, JoinHandle}; +use crate::runtime::{Builder, Callback, Handle}; + +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::time::Duration; + +pub(crate) struct BlockingPool { + spawner: Spawner, + shutdown_rx: shutdown::Receiver, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + inner: Arc<Inner>, +} + +struct Inner { + /// State shared between worker threads. + shared: Mutex<Shared>, + + /// Pool threads wait on this. + condvar: Condvar, + + /// Spawned threads use this name. + thread_name: ThreadNameFn, + + /// Spawned thread stack size. + stack_size: Option<usize>, + + /// Call after a thread starts. + after_start: Option<Callback>, + + /// Call before a thread stops. + before_stop: Option<Callback>, + + // Maximum number of threads. + thread_cap: usize, + + // Customizable wait timeout. + keep_alive: Duration, +} + +struct Shared { + queue: VecDeque<Task>, + num_th: usize, + num_idle: u32, + num_notify: u32, + shutdown: bool, + shutdown_tx: Option<shutdown::Sender>, + /// Prior to shutdown, we clean up JoinHandles by having each timed-out + /// thread join on the previous timed-out thread. This is not strictly + /// necessary but helps avoid Valgrind false positives, see + /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666> + /// for more information. + last_exiting_thread: Option<thread::JoinHandle<()>>, + /// This holds the JoinHandles for all running threads; on shutdown, the thread + /// calling shutdown handles joining on these. + worker_threads: HashMap<usize, thread::JoinHandle<()>>, + /// This is a counter used to iterate worker_threads in a consistent order (for loom's + /// benefit). + worker_thread_index: usize, +} + +pub(crate) struct Task { + task: task::UnownedTask<NoopSchedule>, + mandatory: Mandatory, +} + +#[derive(PartialEq, Eq)] +pub(crate) enum Mandatory { + #[cfg_attr(not(fs), allow(dead_code))] + Mandatory, + NonMandatory, +} + +impl Task { + pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task { + Task { task, mandatory } + } + + fn run(self) { + self.task.run(); + } + + fn shutdown_or_run_if_mandatory(self) { + match self.mandatory { + Mandatory::NonMandatory => self.task.shutdown(), + Mandatory::Mandatory => self.task.run(), + } + } +} + +const KEEP_ALIVE: Duration = Duration::from_secs(10); + +/// Runs the provided function on an executor dedicated to blocking operations. +/// Tasks will be scheduled as non-mandatory, meaning they may not get executed +/// in case of runtime shutdown. +pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + let rt = context::current(); + rt.spawn_blocking(func) +} + +cfg_fs! { + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] + /// Runs the provided function on an executor dedicated to blocking + /// operations. Tasks will be scheduled as mandatory, meaning they are + /// guaranteed to run unless a shutdown is already taking place. In case a + /// shutdown is already taking place, `None` will be returned. + pub(crate) fn spawn_mandatory_blocking<F, R>(func: F) -> Option<JoinHandle<R>> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let rt = context::current(); + rt.spawn_mandatory_blocking(func) + } +} + +// ===== impl BlockingPool ===== + +impl BlockingPool { + pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE); + + BlockingPool { + spawner: Spawner { + inner: Arc::new(Inner { + shared: Mutex::new(Shared { + queue: VecDeque::new(), + num_th: 0, + num_idle: 0, + num_notify: 0, + shutdown: false, + shutdown_tx: Some(shutdown_tx), + last_exiting_thread: None, + worker_threads: HashMap::new(), + worker_thread_index: 0, + }), + condvar: Condvar::new(), + thread_name: builder.thread_name.clone(), + stack_size: builder.thread_stack_size, + after_start: builder.after_start.clone(), + before_stop: builder.before_stop.clone(), + thread_cap, + keep_alive, + }), + }, + shutdown_rx, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { + let mut shared = self.spawner.inner.shared.lock(); + + // The function can be called multiple times. First, by explicitly + // calling `shutdown` then by the drop handler calling `shutdown`. This + // prevents shutting down twice. + if shared.shutdown { + return; + } + + shared.shutdown = true; + shared.shutdown_tx = None; + self.spawner.inner.condvar.notify_all(); + + let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); + let workers = std::mem::take(&mut shared.worker_threads); + + drop(shared); + + if self.shutdown_rx.wait(timeout) { + let _ = last_exited_thread.map(|th| th.join()); + + // Loom requires that execution be deterministic, so sort by thread ID before joining. + // (HashMaps use a randomly-seeded hash function, so the order is nondeterministic) + let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect(); + workers.sort_by_key(|(id, _)| *id); + + for (_id, handle) in workers.into_iter() { + let _ = handle.join(); + } + } + } +} + +impl Drop for BlockingPool { + fn drop(&mut self) { + self.shutdown(None); + } +} + +impl fmt::Debug for BlockingPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BlockingPool").finish() + } +} + +// ===== impl Spawner ===== + +impl Spawner { + pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { + let mut shared = self.inner.shared.lock(); + + if shared.shutdown { + // Shutdown the task: it's fine to shutdown this task (even if + // mandatory) because it was scheduled after the shutdown of the + // runtime began. + task.task.shutdown(); + + // no need to even push this task; it would never get picked up + return Err(()); + } + + shared.queue.push_back(task); + + if shared.num_idle == 0 { + // No threads are able to process the task. + + if shared.num_th == self.inner.thread_cap { + // At max number of threads + } else { + shared.num_th += 1; + assert!(shared.shutdown_tx.is_some()); + let shutdown_tx = shared.shutdown_tx.clone(); + + if let Some(shutdown_tx) = shutdown_tx { + let id = shared.worker_thread_index; + shared.worker_thread_index += 1; + + let handle = self.spawn_thread(shutdown_tx, rt, id); + + shared.worker_threads.insert(id, handle); + } + } + } else { + // Notify an idle worker thread. The notification counter + // is used to count the needed amount of notifications + // exactly. Thread libraries may generate spurious + // wakeups, this counter is used to keep us in a + // consistent state. + shared.num_idle -= 1; + shared.num_notify += 1; + self.inner.condvar.notify_one(); + } + + Ok(()) + } + + fn spawn_thread( + &self, + shutdown_tx: shutdown::Sender, + rt: &Handle, + id: usize, + ) -> thread::JoinHandle<()> { + let mut builder = thread::Builder::new().name((self.inner.thread_name)()); + + if let Some(stack_size) = self.inner.stack_size { + builder = builder.stack_size(stack_size); + } + + let rt = rt.clone(); + + builder + .spawn(move || { + // Only the reference should be moved into the closure + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(id); + drop(shutdown_tx); + }) + .expect("OS can't spawn a new worker thread") + } +} + +impl Inner { + fn run(&self, worker_thread_id: usize) { + if let Some(f) = &self.after_start { + f() + } + + let mut shared = self.shared.lock(); + let mut join_on_thread = None; + + 'main: loop { + // BUSY + while let Some(task) = shared.queue.pop_front() { + drop(shared); + task.run(); + + shared = self.shared.lock(); + } + + // IDLE + shared.num_idle += 1; + + while !shared.shutdown { + let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); + + shared = lock_result.0; + let timeout_result = lock_result.1; + + if shared.num_notify != 0 { + // We have received a legitimate wakeup, + // acknowledge it by decrementing the counter + // and transition to the BUSY state. + shared.num_notify -= 1; + break; + } + + // Even if the condvar "timed out", if the pool is entering the + // shutdown phase, we want to perform the cleanup logic. + if !shared.shutdown && timeout_result.timed_out() { + // We'll join the prior timed-out thread's JoinHandle after dropping the lock. + // This isn't done when shutting down, because the thread calling shutdown will + // handle joining everything. + let my_handle = shared.worker_threads.remove(&worker_thread_id); + join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle); + + break 'main; + } + + // Spurious wakeup detected, go back to sleep. + } + + if shared.shutdown { + // Drain the queue + while let Some(task) = shared.queue.pop_front() { + drop(shared); + + task.shutdown_or_run_if_mandatory(); + + shared = self.shared.lock(); + } + + // Work was produced, and we "took" it (by decrementing num_notify). + // This means that num_idle was decremented once for our wakeup. + // But, since we are exiting, we need to "undo" that, as we'll stay idle. + shared.num_idle += 1; + // NOTE: Technically we should also do num_notify++ and notify again, + // but since we're shutting down anyway, that won't be necessary. + break; + } + } + + // Thread exit + shared.num_th -= 1; + + // num_idle should now be tracked exactly, panic + // with a descriptive message if it is not the + // case. + shared.num_idle = shared + .num_idle + .checked_sub(1) + .expect("num_idle underflowed on thread exit"); + + if shared.shutdown && shared.num_th == 0 { + self.condvar.notify_one(); + } + + drop(shared); + + if let Some(f) = &self.before_stop { + f() + } + + if let Some(handle) = join_on_thread { + let _ = handle.join(); + } + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("blocking::Spawner").finish() + } +} |