summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/runtime/blocking
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/runtime/blocking')
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/mod.rs48
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/pool.rs396
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/schedule.rs19
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/shutdown.rs71
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/task.rs44
5 files changed, 578 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/blocking/mod.rs b/third_party/rust/tokio/src/runtime/blocking/mod.rs
new file mode 100644
index 0000000000..15fe05c9ad
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/mod.rs
@@ -0,0 +1,48 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking
+//! pool. When the `blocking` feature flag is **not** enabled, these APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+mod pool;
+pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task};
+
+cfg_fs! {
+ pub(crate) use pool::spawn_mandatory_blocking;
+}
+
+mod schedule;
+mod shutdown;
+mod task;
+pub(crate) use schedule::NoopSchedule;
+pub(crate) use task::BlockingTask;
+
+use crate::runtime::Builder;
+
+pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
+}
+
+/*
+cfg_not_blocking_impl! {
+ use crate::runtime::Builder;
+ use std::time::Duration;
+
+ #[derive(Debug, Clone)]
+ pub(crate) struct BlockingPool {}
+
+ pub(crate) use BlockingPool as Spawner;
+
+ pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
+ BlockingPool {}
+ }
+
+ impl BlockingPool {
+ pub(crate) fn spawner(&self) -> &BlockingPool {
+ self
+ }
+
+ pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
+ }
+ }
+}
+*/
diff --git a/third_party/rust/tokio/src/runtime/blocking/pool.rs b/third_party/rust/tokio/src/runtime/blocking/pool.rs
new file mode 100644
index 0000000000..daf1f63fac
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/pool.rs
@@ -0,0 +1,396 @@
+//! Thread pool for blocking operations
+
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::shutdown;
+use crate::runtime::builder::ThreadNameFn;
+use crate::runtime::context;
+use crate::runtime::task::{self, JoinHandle};
+use crate::runtime::{Builder, Callback, Handle};
+
+use std::collections::{HashMap, VecDeque};
+use std::fmt;
+use std::time::Duration;
+
+pub(crate) struct BlockingPool {
+ spawner: Spawner,
+ shutdown_rx: shutdown::Receiver,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// State shared between worker threads.
+ shared: Mutex<Shared>,
+
+ /// Pool threads wait on this.
+ condvar: Condvar,
+
+ /// Spawned threads use this name.
+ thread_name: ThreadNameFn,
+
+ /// Spawned thread stack size.
+ stack_size: Option<usize>,
+
+ /// Call after a thread starts.
+ after_start: Option<Callback>,
+
+ /// Call before a thread stops.
+ before_stop: Option<Callback>,
+
+ // Maximum number of threads.
+ thread_cap: usize,
+
+ // Customizable wait timeout.
+ keep_alive: Duration,
+}
+
+struct Shared {
+ queue: VecDeque<Task>,
+ num_th: usize,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+ shutdown_tx: Option<shutdown::Sender>,
+ /// Prior to shutdown, we clean up JoinHandles by having each timed-out
+ /// thread join on the previous timed-out thread. This is not strictly
+ /// necessary but helps avoid Valgrind false positives, see
+ /// <https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666>
+ /// for more information.
+ last_exiting_thread: Option<thread::JoinHandle<()>>,
+ /// This holds the JoinHandles for all running threads; on shutdown, the thread
+ /// calling shutdown handles joining on these.
+ worker_threads: HashMap<usize, thread::JoinHandle<()>>,
+ /// This is a counter used to iterate worker_threads in a consistent order (for loom's
+ /// benefit).
+ worker_thread_index: usize,
+}
+
+pub(crate) struct Task {
+ task: task::UnownedTask<NoopSchedule>,
+ mandatory: Mandatory,
+}
+
+#[derive(PartialEq, Eq)]
+pub(crate) enum Mandatory {
+ #[cfg_attr(not(fs), allow(dead_code))]
+ Mandatory,
+ NonMandatory,
+}
+
+impl Task {
+ pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
+ Task { task, mandatory }
+ }
+
+ fn run(self) {
+ self.task.run();
+ }
+
+ fn shutdown_or_run_if_mandatory(self) {
+ match self.mandatory {
+ Mandatory::NonMandatory => self.task.shutdown(),
+ Mandatory::Mandatory => self.task.run(),
+ }
+ }
+}
+
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Runs the provided function on an executor dedicated to blocking operations.
+/// Tasks will be scheduled as non-mandatory, meaning they may not get executed
+/// in case of runtime shutdown.
+pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+{
+ let rt = context::current();
+ rt.spawn_blocking(func)
+}
+
+cfg_fs! {
+ #[cfg_attr(any(
+ all(loom, not(test)), // the function is covered by loom tests
+ test
+ ), allow(dead_code))]
+ /// Runs the provided function on an executor dedicated to blocking
+ /// operations. Tasks will be scheduled as mandatory, meaning they are
+ /// guaranteed to run unless a shutdown is already taking place. In case a
+ /// shutdown is already taking place, `None` will be returned.
+ pub(crate) fn spawn_mandatory_blocking<F, R>(func: F) -> Option<JoinHandle<R>>
+ where
+ F: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+ {
+ let rt = context::current();
+ rt.spawn_mandatory_blocking(func)
+ }
+}
+
+// ===== impl BlockingPool =====
+
+impl BlockingPool {
+ pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+ let keep_alive = builder.keep_alive.unwrap_or(KEEP_ALIVE);
+
+ BlockingPool {
+ spawner: Spawner {
+ inner: Arc::new(Inner {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ shutdown_tx: Some(shutdown_tx),
+ last_exiting_thread: None,
+ worker_threads: HashMap::new(),
+ worker_thread_index: 0,
+ }),
+ condvar: Condvar::new(),
+ thread_name: builder.thread_name.clone(),
+ stack_size: builder.thread_stack_size,
+ after_start: builder.after_start.clone(),
+ before_stop: builder.before_stop.clone(),
+ thread_cap,
+ keep_alive,
+ }),
+ },
+ shutdown_rx,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
+ let mut shared = self.spawner.inner.shared.lock();
+
+ // The function can be called multiple times. First, by explicitly
+ // calling `shutdown` then by the drop handler calling `shutdown`. This
+ // prevents shutting down twice.
+ if shared.shutdown {
+ return;
+ }
+
+ shared.shutdown = true;
+ shared.shutdown_tx = None;
+ self.spawner.inner.condvar.notify_all();
+
+ let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
+ let workers = std::mem::take(&mut shared.worker_threads);
+
+ drop(shared);
+
+ if self.shutdown_rx.wait(timeout) {
+ let _ = last_exited_thread.map(|th| th.join());
+
+ // Loom requires that execution be deterministic, so sort by thread ID before joining.
+ // (HashMaps use a randomly-seeded hash function, so the order is nondeterministic)
+ let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect();
+ workers.sort_by_key(|(id, _)| *id);
+
+ for (_id, handle) in workers.into_iter() {
+ let _ = handle.join();
+ }
+ }
+ }
+}
+
+impl Drop for BlockingPool {
+ fn drop(&mut self) {
+ self.shutdown(None);
+ }
+}
+
+impl fmt::Debug for BlockingPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BlockingPool").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
+ let mut shared = self.inner.shared.lock();
+
+ if shared.shutdown {
+ // Shutdown the task: it's fine to shutdown this task (even if
+ // mandatory) because it was scheduled after the shutdown of the
+ // runtime began.
+ task.task.shutdown();
+
+ // no need to even push this task; it would never get picked up
+ return Err(());
+ }
+
+ shared.queue.push_back(task);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == self.inner.thread_cap {
+ // At max number of threads
+ } else {
+ shared.num_th += 1;
+ assert!(shared.shutdown_tx.is_some());
+ let shutdown_tx = shared.shutdown_tx.clone();
+
+ if let Some(shutdown_tx) = shutdown_tx {
+ let id = shared.worker_thread_index;
+ shared.worker_thread_index += 1;
+
+ let handle = self.spawn_thread(shutdown_tx, rt, id);
+
+ shared.worker_threads.insert(id, handle);
+ }
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ self.inner.condvar.notify_one();
+ }
+
+ Ok(())
+ }
+
+ fn spawn_thread(
+ &self,
+ shutdown_tx: shutdown::Sender,
+ rt: &Handle,
+ id: usize,
+ ) -> thread::JoinHandle<()> {
+ let mut builder = thread::Builder::new().name((self.inner.thread_name)());
+
+ if let Some(stack_size) = self.inner.stack_size {
+ builder = builder.stack_size(stack_size);
+ }
+
+ let rt = rt.clone();
+
+ builder
+ .spawn(move || {
+ // Only the reference should be moved into the closure
+ let _enter = crate::runtime::context::enter(rt.clone());
+ rt.blocking_spawner.inner.run(id);
+ drop(shutdown_tx);
+ })
+ .expect("OS can't spawn a new worker thread")
+ }
+}
+
+impl Inner {
+ fn run(&self, worker_thread_id: usize) {
+ if let Some(f) = &self.after_start {
+ f()
+ }
+
+ let mut shared = self.shared.lock();
+ let mut join_on_thread = None;
+
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.run();
+
+ shared = self.shared.lock();
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
+
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ // Even if the condvar "timed out", if the pool is entering the
+ // shutdown phase, we want to perform the cleanup logic.
+ if !shared.shutdown && timeout_result.timed_out() {
+ // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
+ // This isn't done when shutting down, because the thread calling shutdown will
+ // handle joining everything.
+ let my_handle = shared.worker_threads.remove(&worker_thread_id);
+ join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
+
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Drain the queue
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+
+ task.shutdown_or_run_if_mandatory();
+
+ shared = self.shared.lock();
+ }
+
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ self.condvar.notify_one();
+ }
+
+ drop(shared);
+
+ if let Some(f) = &self.before_stop {
+ f()
+ }
+
+ if let Some(handle) = join_on_thread {
+ let _ = handle.join();
+ }
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("blocking::Spawner").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/schedule.rs b/third_party/rust/tokio/src/runtime/blocking/schedule.rs
new file mode 100644
index 0000000000..54252241d9
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/schedule.rs
@@ -0,0 +1,19 @@
+use crate::runtime::task::{self, Task};
+
+/// `task::Schedule` implementation that does nothing. This is unique to the
+/// blocking scheduler as tasks scheduled are not really futures but blocking
+/// operations.
+///
+/// We avoid storing the task by forgetting it in `bind` and re-materializing it
+/// in `release.
+pub(crate) struct NoopSchedule;
+
+impl task::Schedule for NoopSchedule {
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/shutdown.rs b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs
new file mode 100644
index 0000000000..e6f4674183
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs
@@ -0,0 +1,71 @@
+//! A shutdown channel.
+//!
+//! Each worker holds the `Sender` half. When all the `Sender` halves are
+//! dropped, the `Receiver` receives a notification.
+
+use crate::loom::sync::Arc;
+use crate::sync::oneshot;
+
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+pub(super) struct Sender {
+ _tx: Arc<oneshot::Sender<()>>,
+}
+
+#[derive(Debug)]
+pub(super) struct Receiver {
+ rx: oneshot::Receiver<()>,
+}
+
+pub(super) fn channel() -> (Sender, Receiver) {
+ let (tx, rx) = oneshot::channel();
+ let tx = Sender { _tx: Arc::new(tx) };
+ let rx = Receiver { rx };
+
+ (tx, rx)
+}
+
+impl Receiver {
+ /// Blocks the current thread until all `Sender` handles drop.
+ ///
+ /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
+ /// duration. If `timeout` is `None`, then the thread is blocked until the
+ /// shutdown signal is received.
+ ///
+ /// If the timeout has elapsed, it returns `false`, otherwise it returns `true`.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) -> bool {
+ use crate::runtime::enter::try_enter;
+
+ if timeout == Some(Duration::from_nanos(0)) {
+ return false;
+ }
+
+ let mut e = match try_enter(false) {
+ Some(enter) => enter,
+ _ => {
+ if std::thread::panicking() {
+ // Don't panic in a panic
+ return false;
+ } else {
+ panic!(
+ "Cannot drop a runtime in a context where blocking is not allowed. \
+ This happens when a runtime is dropped from within an asynchronous context."
+ );
+ }
+ }
+ };
+
+ // The oneshot completes with an Err
+ //
+ // If blocking fails to wait, this indicates a problem parking the
+ // current thread (usually, shutting down a runtime stored in a
+ // thread-local).
+ if let Some(timeout) = timeout {
+ e.block_on_timeout(&mut self.rx, timeout).is_ok()
+ } else {
+ let _ = e.block_on(&mut self.rx);
+ true
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/task.rs b/third_party/rust/tokio/src/runtime/blocking/task.rs
new file mode 100644
index 0000000000..0b7803a6c0
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/task.rs
@@ -0,0 +1,44 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Converts a function to a future that completes on poll.
+pub(crate) struct BlockingTask<T> {
+ func: Option<T>,
+}
+
+impl<T> BlockingTask<T> {
+ /// Initializes a new blocking task from the given function.
+ pub(crate) fn new(func: T) -> BlockingTask<T> {
+ BlockingTask { func: Some(func) }
+ }
+}
+
+// The closure `F` is never pinned
+impl<T> Unpin for BlockingTask<T> {}
+
+impl<T, R> Future for BlockingTask<T>
+where
+ T: FnOnce() -> R + Send + 'static,
+ R: Send + 'static,
+{
+ type Output = R;
+
+ fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
+ let me = &mut *self;
+ let func = me
+ .func
+ .take()
+ .expect("[internal exception] blocking task ran twice.");
+
+ // This is a little subtle:
+ // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
+ // using coop. However, the way things are currently modeled, even running a blocking task
+ // currently goes through Task::poll(), and so is subject to budgeting. That isn't really
+ // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
+ // we want it to start without any budgeting.
+ crate::coop::stop();
+
+ Poll::Ready(func())
+ }
+}