summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/runtime/scheduler
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-19 00:47:55 +0000
commit26a029d407be480d791972afb5975cf62c9360a6 (patch)
treef435a8308119effd964b339f76abb83a57c29483 /third_party/rust/tokio/src/runtime/scheduler
parentInitial commit. (diff)
downloadfirefox-26a029d407be480d791972afb5975cf62c9360a6.tar.xz
firefox-26a029d407be480d791972afb5975cf62c9360a6.zip
Adding upstream version 124.0.1.upstream/124.0.1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/runtime/scheduler')
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/current_thread.rs750
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/defer.rs43
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject.rs72
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject/metrics.rs7
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject/pop.rs55
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs98
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject/shared.rs119
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/inject/synced.rs32
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/lock.rs6
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/mod.rs249
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/counters.rs62
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle.rs68
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs41
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/taskdump.rs26
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/idle.rs240
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/mod.rs103
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/overflow.rs26
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/park.rs232
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/queue.rs608
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/stats.rs140
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace.rs61
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace_mock.rs11
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker.rs1216
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/metrics.rs11
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs79
-rw-r--r--third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump_mock.rs7
26 files changed, 4362 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/scheduler/current_thread.rs b/third_party/rust/tokio/src/runtime/scheduler/current_thread.rs
new file mode 100644
index 0000000000..ac4a8d6fac
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/current_thread.rs
@@ -0,0 +1,750 @@
+use crate::future::poll_fn;
+use crate::loom::sync::atomic::AtomicBool;
+use crate::loom::sync::Arc;
+use crate::runtime::driver::{self, Driver};
+use crate::runtime::scheduler::{self, Defer, Inject};
+use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
+use crate::sync::notify::Notify;
+use crate::util::atomic_cell::AtomicCell;
+use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
+
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::sync::atomic::Ordering::{AcqRel, Release};
+use std::task::Poll::{Pending, Ready};
+use std::task::Waker;
+use std::time::Duration;
+
+/// Executes tasks on the current thread
+pub(crate) struct CurrentThread {
+ /// Core scheduler data is acquired by a thread entering `block_on`.
+ core: AtomicCell<Core>,
+
+ /// Notifier for waking up other threads to steal the
+ /// driver.
+ notify: Notify,
+}
+
+/// Handle to the current thread scheduler
+pub(crate) struct Handle {
+ /// Scheduler state shared across threads
+ shared: Shared,
+
+ /// Resource driver handles
+ pub(crate) driver: driver::Handle,
+
+ /// Blocking pool spawner
+ pub(crate) blocking_spawner: blocking::Spawner,
+
+ /// Current random number generator seed
+ pub(crate) seed_generator: RngSeedGenerator,
+}
+
+/// Data required for executing the scheduler. The struct is passed around to
+/// a function that will perform the scheduling work and acts as a capability token.
+struct Core {
+ /// Scheduler run queue
+ tasks: VecDeque<Notified>,
+
+ /// Current tick
+ tick: u32,
+
+ /// Runtime driver
+ ///
+ /// The driver is removed before starting to park the thread
+ driver: Option<Driver>,
+
+ /// Metrics batch
+ metrics: MetricsBatch,
+
+ /// How often to check the global queue
+ global_queue_interval: u32,
+
+ /// True if a task panicked without being handled and the runtime is
+ /// configured to shutdown on unhandled panic.
+ unhandled_panic: bool,
+}
+
+/// Scheduler state shared between threads.
+struct Shared {
+ /// Remote run queue
+ inject: Inject<Arc<Handle>>,
+
+ /// Collection of all active tasks spawned onto this executor.
+ owned: OwnedTasks<Arc<Handle>>,
+
+ /// Indicates whether the blocked on thread was woken.
+ woken: AtomicBool,
+
+ /// Scheduler configuration options
+ config: Config,
+
+ /// Keeps track of various runtime metrics.
+ scheduler_metrics: SchedulerMetrics,
+
+ /// This scheduler only has one worker.
+ worker_metrics: WorkerMetrics,
+}
+
+/// Thread-local context.
+///
+/// pub(crate) to store in `runtime::context`.
+pub(crate) struct Context {
+ /// Scheduler handle
+ handle: Arc<Handle>,
+
+ /// Scheduler core, enabling the holder of `Context` to execute the
+ /// scheduler.
+ core: RefCell<Option<Box<Core>>>,
+
+ /// Deferred tasks, usually ones that called `task::yield_now()`.
+ pub(crate) defer: Defer,
+}
+
+type Notified = task::Notified<Arc<Handle>>;
+
+/// Initial queue capacity.
+const INITIAL_CAPACITY: usize = 64;
+
+/// Used if none is specified. This is a temporary constant and will be removed
+/// as we unify tuning logic between the multi-thread and current-thread
+/// schedulers.
+const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
+
+impl CurrentThread {
+ pub(crate) fn new(
+ driver: Driver,
+ driver_handle: driver::Handle,
+ blocking_spawner: blocking::Spawner,
+ seed_generator: RngSeedGenerator,
+ config: Config,
+ ) -> (CurrentThread, Arc<Handle>) {
+ let worker_metrics = WorkerMetrics::from_config(&config);
+
+ // Get the configured global queue interval, or use the default.
+ let global_queue_interval = config
+ .global_queue_interval
+ .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
+
+ let handle = Arc::new(Handle {
+ shared: Shared {
+ inject: Inject::new(),
+ owned: OwnedTasks::new(),
+ woken: AtomicBool::new(false),
+ config,
+ scheduler_metrics: SchedulerMetrics::new(),
+ worker_metrics,
+ },
+ driver: driver_handle,
+ blocking_spawner,
+ seed_generator,
+ });
+
+ let core = AtomicCell::new(Some(Box::new(Core {
+ tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
+ tick: 0,
+ driver: Some(driver),
+ metrics: MetricsBatch::new(&handle.shared.worker_metrics),
+ global_queue_interval,
+ unhandled_panic: false,
+ })));
+
+ let scheduler = CurrentThread {
+ core,
+ notify: Notify::new(),
+ };
+
+ (scheduler, handle)
+ }
+
+ #[track_caller]
+ pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
+ pin!(future);
+
+ crate::runtime::context::enter_runtime(handle, false, |blocking| {
+ let handle = handle.as_current_thread();
+
+ // Attempt to steal the scheduler core and block_on the future if we can
+ // there, otherwise, lets select on a notification that the core is
+ // available or the future is complete.
+ loop {
+ if let Some(core) = self.take_core(handle) {
+ return core.block_on(future);
+ } else {
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = blocking
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = future.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
+
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
+ }
+ })
+ }
+
+ fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
+ let core = self.core.take()?;
+
+ Some(CoreGuard {
+ context: scheduler::Context::CurrentThread(Context {
+ handle: handle.clone(),
+ core: RefCell::new(Some(core)),
+ defer: Defer::new(),
+ }),
+ scheduler: self,
+ })
+ }
+
+ pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
+ let handle = handle.as_current_thread();
+
+ // Avoid a double panic if we are currently panicking and
+ // the lock may be poisoned.
+
+ let core = match self.take_core(handle) {
+ Some(core) => core,
+ None if std::thread::panicking() => return,
+ None => panic!("Oh no! We never placed the Core back, this is a bug!"),
+ };
+
+ // Check that the thread-local is not being destroyed
+ let tls_available = context::with_current(|_| ()).is_ok();
+
+ if tls_available {
+ core.enter(|core, _context| {
+ let core = shutdown2(core, handle);
+ (core, ())
+ });
+ } else {
+ // Shutdown without setting the context. `tokio::spawn` calls will
+ // fail, but those will fail either way because the thread-local is
+ // not available anymore.
+ let context = core.context.expect_current_thread();
+ let core = context.core.borrow_mut().take().unwrap();
+
+ let core = shutdown2(core, handle);
+ *context.core.borrow_mut() = Some(core);
+ }
+ }
+}
+
+fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
+ // Drain the OwnedTasks collection. This call also closes the
+ // collection, ensuring that no tasks are ever pushed after this
+ // call returns.
+ handle.shared.owned.close_and_shutdown_all();
+
+ // Drain local queue
+ // We already shut down every task, so we just need to drop the task.
+ while let Some(task) = core.next_local_task(handle) {
+ drop(task);
+ }
+
+ // Close the injection queue
+ handle.shared.inject.close();
+
+ // Drain remote queue
+ while let Some(task) = handle.shared.inject.pop() {
+ drop(task);
+ }
+
+ assert!(handle.shared.owned.is_empty());
+
+ // Submit metrics
+ core.submit_metrics(handle);
+
+ // Shutdown the resource drivers
+ if let Some(driver) = core.driver.as_mut() {
+ driver.shutdown(&handle.driver);
+ }
+
+ core
+}
+
+impl fmt::Debug for CurrentThread {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("CurrentThread").finish()
+ }
+}
+
+// ===== impl Core =====
+
+impl Core {
+ /// Get and increment the current tick
+ fn tick(&mut self) {
+ self.tick = self.tick.wrapping_add(1);
+ }
+
+ fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
+ if self.tick % self.global_queue_interval == 0 {
+ handle
+ .next_remote_task()
+ .or_else(|| self.next_local_task(handle))
+ } else {
+ self.next_local_task(handle)
+ .or_else(|| handle.next_remote_task())
+ }
+ }
+
+ fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
+ let ret = self.tasks.pop_front();
+ handle
+ .shared
+ .worker_metrics
+ .set_queue_depth(self.tasks.len());
+ ret
+ }
+
+ fn push_task(&mut self, handle: &Handle, task: Notified) {
+ self.tasks.push_back(task);
+ self.metrics.inc_local_schedule_count();
+ handle
+ .shared
+ .worker_metrics
+ .set_queue_depth(self.tasks.len());
+ }
+
+ fn submit_metrics(&mut self, handle: &Handle) {
+ self.metrics.submit(&handle.shared.worker_metrics);
+ }
+}
+
+#[cfg(tokio_taskdump)]
+fn wake_deferred_tasks_and_free(context: &Context) {
+ let wakers = context.defer.take_deferred();
+ for waker in wakers {
+ waker.wake();
+ }
+}
+
+// ===== impl Context =====
+
+impl Context {
+ /// Execute the closure with the given scheduler core stored in the
+ /// thread-local context.
+ fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
+ core.metrics.start_poll();
+ let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
+ ret.0.metrics.end_poll();
+ ret
+ }
+
+ /// Blocks the current thread until an event is received by the driver,
+ /// including I/O events, timer events, ...
+ fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
+ let mut driver = core.driver.take().expect("driver missing");
+
+ if let Some(f) = &handle.shared.config.before_park {
+ // Incorrect lint, the closures are actually different types so `f`
+ // cannot be passed as an argument to `enter`.
+ #[allow(clippy::redundant_closure)]
+ let (c, _) = self.enter(core, || f());
+ core = c;
+ }
+
+ // This check will fail if `before_park` spawns a task for us to run
+ // instead of parking the thread
+ if core.tasks.is_empty() {
+ // Park until the thread is signaled
+ core.metrics.about_to_park();
+ core.submit_metrics(handle);
+
+ let (c, _) = self.enter(core, || {
+ driver.park(&handle.driver);
+ self.defer.wake();
+ });
+
+ core = c;
+ }
+
+ if let Some(f) = &handle.shared.config.after_unpark {
+ // Incorrect lint, the closures are actually different types so `f`
+ // cannot be passed as an argument to `enter`.
+ #[allow(clippy::redundant_closure)]
+ let (c, _) = self.enter(core, || f());
+ core = c;
+ }
+
+ core.driver = Some(driver);
+ core
+ }
+
+ /// Checks the driver for new events without blocking the thread.
+ fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
+ let mut driver = core.driver.take().expect("driver missing");
+
+ core.submit_metrics(handle);
+
+ let (mut core, _) = self.enter(core, || {
+ driver.park_timeout(&handle.driver, Duration::from_millis(0));
+ self.defer.wake();
+ });
+
+ core.driver = Some(driver);
+ core
+ }
+
+ fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
+ // Store the scheduler core in the thread-local context
+ //
+ // A drop-guard is employed at a higher level.
+ *self.core.borrow_mut() = Some(core);
+
+ // Execute the closure while tracking the execution budget
+ let ret = f();
+
+ // Take the scheduler core back
+ let core = self.core.borrow_mut().take().expect("core missing");
+ (core, ret)
+ }
+
+ pub(crate) fn defer(&self, waker: &Waker) {
+ self.defer.defer(waker);
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ /// Spawns a future onto the `CurrentThread` scheduler
+ pub(crate) fn spawn<F>(
+ me: &Arc<Self>,
+ future: F,
+ id: crate::runtime::task::Id,
+ ) -> JoinHandle<F::Output>
+ where
+ F: crate::future::Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
+
+ if let Some(notified) = notified {
+ me.schedule(notified);
+ }
+
+ handle
+ }
+
+ /// Capture a snapshot of this runtime's state.
+ #[cfg(all(
+ tokio_unstable,
+ tokio_taskdump,
+ target_os = "linux",
+ any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
+ ))]
+ pub(crate) fn dump(&self) -> crate::runtime::Dump {
+ use crate::runtime::dump;
+ use task::trace::trace_current_thread;
+
+ let mut traces = vec![];
+
+ // todo: how to make this work outside of a runtime context?
+ context::with_scheduler(|maybe_context| {
+ // drain the local queue
+ let context = if let Some(context) = maybe_context {
+ context.expect_current_thread()
+ } else {
+ return;
+ };
+ let mut maybe_core = context.core.borrow_mut();
+ let core = if let Some(core) = maybe_core.as_mut() {
+ core
+ } else {
+ return;
+ };
+ let local = &mut core.tasks;
+
+ if self.shared.inject.is_closed() {
+ return;
+ }
+
+ traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
+ .into_iter()
+ .map(dump::Task::new)
+ .collect();
+
+ // Avoid double borrow panic
+ drop(maybe_core);
+
+ // Taking a taskdump could wakes every task, but we probably don't want
+ // the `yield_now` vector to be that large under normal circumstances.
+ // Therefore, we free its allocation.
+ wake_deferred_tasks_and_free(context);
+ });
+
+ dump::Dump::new(traces)
+ }
+
+ fn next_remote_task(&self) -> Option<Notified> {
+ self.shared.inject.pop()
+ }
+
+ fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
+ // Set woken to true when enter block_on, ensure outer future
+ // be polled for the first time when enter loop
+ me.shared.woken.store(true, Release);
+ waker_ref(me)
+ }
+
+ // reset woken to false and return original value
+ pub(crate) fn reset_woken(&self) -> bool {
+ self.shared.woken.swap(false, AcqRel)
+ }
+}
+
+cfg_metrics! {
+ impl Handle {
+ pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
+ &self.shared.scheduler_metrics
+ }
+
+ pub(crate) fn injection_queue_depth(&self) -> usize {
+ self.shared.inject.len()
+ }
+
+ pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
+ assert_eq!(0, worker);
+ &self.shared.worker_metrics
+ }
+
+ pub(crate) fn num_blocking_threads(&self) -> usize {
+ self.blocking_spawner.num_threads()
+ }
+
+ pub(crate) fn num_idle_blocking_threads(&self) -> usize {
+ self.blocking_spawner.num_idle_threads()
+ }
+
+ pub(crate) fn blocking_queue_depth(&self) -> usize {
+ self.blocking_spawner.queue_depth()
+ }
+
+ pub(crate) fn active_tasks_count(&self) -> usize {
+ self.shared.owned.active_tasks_count()
+ }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("current_thread::Handle { ... }").finish()
+ }
+}
+
+// ===== impl Shared =====
+
+impl Schedule for Arc<Handle> {
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ self.shared.owned.remove(task)
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ use scheduler::Context::CurrentThread;
+
+ context::with_scheduler(|maybe_cx| match maybe_cx {
+ Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
+ let mut core = cx.core.borrow_mut();
+
+ // If `None`, the runtime is shutting down, so there is no need
+ // to schedule the task.
+ if let Some(core) = core.as_mut() {
+ core.push_task(self, task);
+ }
+ }
+ _ => {
+ // Track that a task was scheduled from **outside** of the runtime.
+ self.shared.scheduler_metrics.inc_remote_schedule_count();
+
+ // Schedule the task
+ self.shared.inject.push(task);
+ self.driver.unpark();
+ }
+ });
+ }
+
+ cfg_unstable! {
+ fn unhandled_panic(&self) {
+ use crate::runtime::UnhandledPanic;
+
+ match self.shared.config.unhandled_panic {
+ UnhandledPanic::Ignore => {
+ // Do nothing
+ }
+ UnhandledPanic::ShutdownRuntime => {
+ use scheduler::Context::CurrentThread;
+
+ // This hook is only called from within the runtime, so
+ // `context::with_scheduler` should match with `&self`, i.e.
+ // there is no opportunity for a nested scheduler to be
+ // called.
+ context::with_scheduler(|maybe_cx| match maybe_cx {
+ Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
+ let mut core = cx.core.borrow_mut();
+
+ // If `None`, the runtime is shutting down, so there is no need to signal shutdown
+ if let Some(core) = core.as_mut() {
+ core.unhandled_panic = true;
+ self.shared.owned.close_and_shutdown_all();
+ }
+ }
+ _ => unreachable!("runtime core not set in CURRENT thread-local"),
+ })
+ }
+ }
+ }
+ }
+}
+
+impl Wake for Handle {
+ fn wake(arc_self: Arc<Self>) {
+ Wake::wake_by_ref(&arc_self)
+ }
+
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.shared.woken.store(true, Release);
+ arc_self.driver.unpark();
+ }
+}
+
+// ===== CoreGuard =====
+
+/// Used to ensure we always place the `Core` value back into its slot in
+/// `CurrentThread`, even if the future panics.
+struct CoreGuard<'a> {
+ context: scheduler::Context,
+ scheduler: &'a CurrentThread,
+}
+
+impl CoreGuard<'_> {
+ #[track_caller]
+ fn block_on<F: Future>(self, future: F) -> F::Output {
+ let ret = self.enter(|mut core, context| {
+ let waker = Handle::waker_ref(&context.handle);
+ let mut cx = std::task::Context::from_waker(&waker);
+
+ pin!(future);
+
+ core.metrics.start_processing_scheduled_tasks();
+
+ 'outer: loop {
+ let handle = &context.handle;
+
+ if handle.reset_woken() {
+ let (c, res) = context.enter(core, || {
+ crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
+ });
+
+ core = c;
+
+ if let Ready(v) = res {
+ return (core, Some(v));
+ }
+ }
+
+ for _ in 0..handle.shared.config.event_interval {
+ // Make sure we didn't hit an unhandled_panic
+ if core.unhandled_panic {
+ return (core, None);
+ }
+
+ core.tick();
+
+ let entry = core.next_task(handle);
+
+ let task = match entry {
+ Some(entry) => entry,
+ None => {
+ core.metrics.end_processing_scheduled_tasks();
+
+ core = if !context.defer.is_empty() {
+ context.park_yield(core, handle)
+ } else {
+ context.park(core, handle)
+ };
+
+ core.metrics.start_processing_scheduled_tasks();
+
+ // Try polling the `block_on` future next
+ continue 'outer;
+ }
+ };
+
+ let task = context.handle.shared.owned.assert_owner(task);
+
+ let (c, _) = context.run_task(core, || {
+ task.run();
+ });
+
+ core = c;
+ }
+
+ core.metrics.end_processing_scheduled_tasks();
+
+ // Yield to the driver, this drives the timer and pulls any
+ // pending I/O events.
+ core = context.park_yield(core, handle);
+
+ core.metrics.start_processing_scheduled_tasks();
+ }
+ });
+
+ match ret {
+ Some(ret) => ret,
+ None => {
+ // `block_on` panicked.
+ panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
+ }
+ }
+ }
+
+ /// Enters the scheduler context. This sets the queue and other necessary
+ /// scheduler state in the thread-local.
+ fn enter<F, R>(self, f: F) -> R
+ where
+ F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
+ {
+ let context = self.context.expect_current_thread();
+
+ // Remove `core` from `context` to pass into the closure.
+ let core = context.core.borrow_mut().take().expect("core missing");
+
+ // Call the closure and place `core` back
+ let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
+
+ *context.core.borrow_mut() = Some(core);
+
+ ret
+ }
+}
+
+impl Drop for CoreGuard<'_> {
+ fn drop(&mut self) {
+ let context = self.context.expect_current_thread();
+
+ if let Some(core) = context.core.borrow_mut().take() {
+ // Replace old scheduler back into the state to allow
+ // other threads to pick it up and drive it.
+ self.scheduler.core.set(core);
+
+ // Wake up other possible threads that could steal the driver.
+ self.scheduler.notify.notify_one()
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/defer.rs b/third_party/rust/tokio/src/runtime/scheduler/defer.rs
new file mode 100644
index 0000000000..a4be8ef2e5
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/defer.rs
@@ -0,0 +1,43 @@
+use std::cell::RefCell;
+use std::task::Waker;
+
+pub(crate) struct Defer {
+ deferred: RefCell<Vec<Waker>>,
+}
+
+impl Defer {
+ pub(crate) fn new() -> Defer {
+ Defer {
+ deferred: Default::default(),
+ }
+ }
+
+ pub(crate) fn defer(&self, waker: &Waker) {
+ let mut deferred = self.deferred.borrow_mut();
+
+ // If the same task adds itself a bunch of times, then only add it once.
+ if let Some(last) = deferred.last() {
+ if last.will_wake(waker) {
+ return;
+ }
+ }
+
+ deferred.push(waker.clone());
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.deferred.borrow().is_empty()
+ }
+
+ pub(crate) fn wake(&self) {
+ while let Some(waker) = self.deferred.borrow_mut().pop() {
+ waker.wake();
+ }
+ }
+
+ #[cfg(tokio_taskdump)]
+ pub(crate) fn take_deferred(&self) -> Vec<Waker> {
+ let mut deferred = self.deferred.borrow_mut();
+ std::mem::take(&mut *deferred)
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject.rs b/third_party/rust/tokio/src/runtime/scheduler/inject.rs
new file mode 100644
index 0000000000..39976fcd7a
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject.rs
@@ -0,0 +1,72 @@
+//! Inject queue used to send wakeups to a work-stealing scheduler
+
+use crate::loom::sync::Mutex;
+use crate::runtime::task;
+
+mod pop;
+pub(crate) use pop::Pop;
+
+mod shared;
+pub(crate) use shared::Shared;
+
+mod synced;
+pub(crate) use synced::Synced;
+
+cfg_rt_multi_thread! {
+ mod rt_multi_thread;
+}
+
+cfg_metrics! {
+ mod metrics;
+}
+
+/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
+/// overflow queue when the local, fixed-size, array queue overflows.
+pub(crate) struct Inject<T: 'static> {
+ shared: Shared<T>,
+ synced: Mutex<Synced>,
+}
+
+impl<T: 'static> Inject<T> {
+ pub(crate) fn new() -> Inject<T> {
+ let (shared, synced) = Shared::new();
+
+ Inject {
+ shared,
+ synced: Mutex::new(synced),
+ }
+ }
+
+ // Kind of annoying to have to include the cfg here
+ #[cfg(tokio_taskdump)]
+ pub(crate) fn is_closed(&self) -> bool {
+ let synced = self.synced.lock();
+ self.shared.is_closed(&synced)
+ }
+
+ /// Closes the injection queue, returns `true` if the queue is open when the
+ /// transition is made.
+ pub(crate) fn close(&self) -> bool {
+ let mut synced = self.synced.lock();
+ self.shared.close(&mut synced)
+ }
+
+ /// Pushes a value into the queue.
+ ///
+ /// This does nothing if the queue is closed.
+ pub(crate) fn push(&self, task: task::Notified<T>) {
+ let mut synced = self.synced.lock();
+ // safety: passing correct `Synced`
+ unsafe { self.shared.push(&mut synced, task) }
+ }
+
+ pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
+ if self.shared.is_empty() {
+ return None;
+ }
+
+ let mut synced = self.synced.lock();
+ // safety: passing correct `Synced`
+ unsafe { self.shared.pop(&mut synced) }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject/metrics.rs b/third_party/rust/tokio/src/runtime/scheduler/inject/metrics.rs
new file mode 100644
index 0000000000..76f045fdbd
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject/metrics.rs
@@ -0,0 +1,7 @@
+use super::Inject;
+
+impl<T: 'static> Inject<T> {
+ pub(crate) fn len(&self) -> usize {
+ self.shared.len()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject/pop.rs b/third_party/rust/tokio/src/runtime/scheduler/inject/pop.rs
new file mode 100644
index 0000000000..4e6d5d3be3
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject/pop.rs
@@ -0,0 +1,55 @@
+use super::Synced;
+
+use crate::runtime::task;
+
+use std::marker::PhantomData;
+
+pub(crate) struct Pop<'a, T: 'static> {
+ len: usize,
+ synced: &'a mut Synced,
+ _p: PhantomData<T>,
+}
+
+impl<'a, T: 'static> Pop<'a, T> {
+ pub(super) fn new(len: usize, synced: &'a mut Synced) -> Pop<'a, T> {
+ Pop {
+ len,
+ synced,
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<'a, T: 'static> Iterator for Pop<'a, T> {
+ type Item = task::Notified<T>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.len == 0 {
+ return None;
+ }
+
+ let ret = self.synced.pop();
+
+ // Should be `Some` when `len > 0`
+ debug_assert!(ret.is_some());
+
+ self.len -= 1;
+ ret
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ (self.len, Some(self.len))
+ }
+}
+
+impl<'a, T: 'static> ExactSizeIterator for Pop<'a, T> {
+ fn len(&self) -> usize {
+ self.len
+ }
+}
+
+impl<'a, T: 'static> Drop for Pop<'a, T> {
+ fn drop(&mut self) {
+ for _ in self.by_ref() {}
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs b/third_party/rust/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs
new file mode 100644
index 0000000000..07d1063c5d
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject/rt_multi_thread.rs
@@ -0,0 +1,98 @@
+use super::{Shared, Synced};
+
+use crate::runtime::scheduler::Lock;
+use crate::runtime::task;
+
+use std::sync::atomic::Ordering::Release;
+
+impl<'a> Lock<Synced> for &'a mut Synced {
+ type Handle = &'a mut Synced;
+
+ fn lock(self) -> Self::Handle {
+ self
+ }
+}
+
+impl AsMut<Synced> for Synced {
+ fn as_mut(&mut self) -> &mut Synced {
+ self
+ }
+}
+
+impl<T: 'static> Shared<T> {
+ /// Pushes several values into the queue.
+ ///
+ /// # Safety
+ ///
+ /// Must be called with the same `Synced` instance returned by `Inject::new`
+ #[inline]
+ pub(crate) unsafe fn push_batch<L, I>(&self, shared: L, mut iter: I)
+ where
+ L: Lock<Synced>,
+ I: Iterator<Item = task::Notified<T>>,
+ {
+ let first = match iter.next() {
+ Some(first) => first.into_raw(),
+ None => return,
+ };
+
+ // Link up all the tasks.
+ let mut prev = first;
+ let mut counter = 1;
+
+ // We are going to be called with an `std::iter::Chain`, and that
+ // iterator overrides `for_each` to something that is easier for the
+ // compiler to optimize than a loop.
+ iter.for_each(|next| {
+ let next = next.into_raw();
+
+ // safety: Holding the Notified for a task guarantees exclusive
+ // access to the `queue_next` field.
+ unsafe { prev.set_queue_next(Some(next)) };
+ prev = next;
+ counter += 1;
+ });
+
+ // Now that the tasks are linked together, insert them into the
+ // linked list.
+ self.push_batch_inner(shared, first, prev, counter);
+ }
+
+ /// Inserts several tasks that have been linked together into the queue.
+ ///
+ /// The provided head and tail may be be the same task. In this case, a
+ /// single task is inserted.
+ #[inline]
+ unsafe fn push_batch_inner<L>(
+ &self,
+ shared: L,
+ batch_head: task::RawTask,
+ batch_tail: task::RawTask,
+ num: usize,
+ ) where
+ L: Lock<Synced>,
+ {
+ debug_assert!(unsafe { batch_tail.get_queue_next().is_none() });
+
+ let mut synced = shared.lock();
+ let synced = synced.as_mut();
+
+ if let Some(tail) = synced.tail {
+ unsafe {
+ tail.set_queue_next(Some(batch_head));
+ }
+ } else {
+ synced.head = Some(batch_head);
+ }
+
+ synced.tail = Some(batch_tail);
+
+ // Increment the count.
+ //
+ // safety: All updates to the len atomic are guarded by the mutex. As
+ // such, a non-atomic load followed by a store is safe.
+ let len = self.len.unsync_load();
+
+ self.len.store(len + num, Release);
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject/shared.rs b/third_party/rust/tokio/src/runtime/scheduler/inject/shared.rs
new file mode 100644
index 0000000000..7fdd2839dd
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject/shared.rs
@@ -0,0 +1,119 @@
+use super::{Pop, Synced};
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::runtime::task;
+
+use std::marker::PhantomData;
+use std::sync::atomic::Ordering::{Acquire, Release};
+
+pub(crate) struct Shared<T: 'static> {
+ /// Number of pending tasks in the queue. This helps prevent unnecessary
+ /// locking in the hot path.
+ pub(super) len: AtomicUsize,
+
+ _p: PhantomData<T>,
+}
+
+unsafe impl<T> Send for Shared<T> {}
+unsafe impl<T> Sync for Shared<T> {}
+
+impl<T: 'static> Shared<T> {
+ pub(crate) fn new() -> (Shared<T>, Synced) {
+ let inject = Shared {
+ len: AtomicUsize::new(0),
+ _p: PhantomData,
+ };
+
+ let synced = Synced {
+ is_closed: false,
+ head: None,
+ tail: None,
+ };
+
+ (inject, synced)
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ // Kind of annoying to have to include the cfg here
+ #[cfg(any(tokio_taskdump, all(feature = "rt-multi-thread", not(tokio_wasi))))]
+ pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
+ synced.is_closed
+ }
+
+ /// Closes the injection queue, returns `true` if the queue is open when the
+ /// transition is made.
+ pub(crate) fn close(&self, synced: &mut Synced) -> bool {
+ if synced.is_closed {
+ return false;
+ }
+
+ synced.is_closed = true;
+ true
+ }
+
+ pub(crate) fn len(&self) -> usize {
+ self.len.load(Acquire)
+ }
+
+ /// Pushes a value into the queue.
+ ///
+ /// This does nothing if the queue is closed.
+ ///
+ /// # Safety
+ ///
+ /// Must be called with the same `Synced` instance returned by `Inject::new`
+ pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
+ if synced.is_closed {
+ return;
+ }
+
+ // safety: only mutated with the lock held
+ let len = self.len.unsync_load();
+ let task = task.into_raw();
+
+ // The next pointer should already be null
+ debug_assert!(unsafe { task.get_queue_next().is_none() });
+
+ if let Some(tail) = synced.tail {
+ // safety: Holding the Notified for a task guarantees exclusive
+ // access to the `queue_next` field.
+ unsafe { tail.set_queue_next(Some(task)) };
+ } else {
+ synced.head = Some(task);
+ }
+
+ synced.tail = Some(task);
+ self.len.store(len + 1, Release);
+ }
+
+ /// Pop a value from the queue.
+ ///
+ /// # Safety
+ ///
+ /// Must be called with the same `Synced` instance returned by `Inject::new`
+ pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
+ self.pop_n(synced, 1).next()
+ }
+
+ /// Pop `n` values from the queue
+ ///
+ /// # Safety
+ ///
+ /// Must be called with the same `Synced` instance returned by `Inject::new`
+ pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
+ use std::cmp;
+
+ // safety: All updates to the len atomic are guarded by the mutex. As
+ // such, a non-atomic load followed by a store is safe.
+ let len = self.len.unsync_load();
+ let n = cmp::min(n, len);
+
+ // Decrement the count.
+ self.len.store(len - n, Release);
+
+ Pop::new(n, synced)
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/inject/synced.rs b/third_party/rust/tokio/src/runtime/scheduler/inject/synced.rs
new file mode 100644
index 0000000000..6847f68e5d
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/inject/synced.rs
@@ -0,0 +1,32 @@
+use crate::runtime::task;
+
+pub(crate) struct Synced {
+ /// True if the queue is closed.
+ pub(super) is_closed: bool,
+
+ /// Linked-list head.
+ pub(super) head: Option<task::RawTask>,
+
+ /// Linked-list tail.
+ pub(super) tail: Option<task::RawTask>,
+}
+
+unsafe impl Send for Synced {}
+unsafe impl Sync for Synced {}
+
+impl Synced {
+ pub(super) fn pop<T: 'static>(&mut self) -> Option<task::Notified<T>> {
+ let task = self.head?;
+
+ self.head = unsafe { task.get_queue_next() };
+
+ if self.head.is_none() {
+ self.tail = None;
+ }
+
+ unsafe { task.set_queue_next(None) };
+
+ // safety: a `Notified` is pushed into the queue and now it is popped!
+ Some(unsafe { task::Notified::from_raw(task) })
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/lock.rs b/third_party/rust/tokio/src/runtime/scheduler/lock.rs
new file mode 100644
index 0000000000..0901c2b37c
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/lock.rs
@@ -0,0 +1,6 @@
+/// A lock (mutex) yielding generic data.
+pub(crate) trait Lock<T> {
+ type Handle: AsMut<T>;
+
+ fn lock(self) -> Self::Handle;
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/mod.rs b/third_party/rust/tokio/src/runtime/scheduler/mod.rs
new file mode 100644
index 0000000000..3e3151711f
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/mod.rs
@@ -0,0 +1,249 @@
+cfg_rt! {
+ pub(crate) mod current_thread;
+ pub(crate) use current_thread::CurrentThread;
+
+ mod defer;
+ use defer::Defer;
+
+ pub(crate) mod inject;
+ pub(crate) use inject::Inject;
+}
+
+cfg_rt_multi_thread! {
+ mod lock;
+ use lock::Lock;
+
+ pub(crate) mod multi_thread;
+ pub(crate) use multi_thread::MultiThread;
+}
+
+use crate::runtime::driver;
+
+#[derive(Debug, Clone)]
+pub(crate) enum Handle {
+ #[cfg(feature = "rt")]
+ CurrentThread(Arc<current_thread::Handle>),
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ MultiThread(Arc<multi_thread::Handle>),
+
+ // TODO: This is to avoid triggering "dead code" warnings many other places
+ // in the codebase. Remove this during a later cleanup
+ #[cfg(not(feature = "rt"))]
+ #[allow(dead_code)]
+ Disabled,
+}
+
+#[cfg(feature = "rt")]
+pub(super) enum Context {
+ CurrentThread(current_thread::Context),
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ MultiThread(multi_thread::Context),
+}
+
+impl Handle {
+ #[cfg_attr(not(feature = "full"), allow(dead_code))]
+ pub(crate) fn driver(&self) -> &driver::Handle {
+ match *self {
+ #[cfg(feature = "rt")]
+ Handle::CurrentThread(ref h) => &h.driver,
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(ref h) => &h.driver,
+
+ #[cfg(not(feature = "rt"))]
+ Handle::Disabled => unreachable!(),
+ }
+ }
+}
+
+cfg_rt! {
+ use crate::future::Future;
+ use crate::loom::sync::Arc;
+ use crate::runtime::{blocking, task::Id};
+ use crate::runtime::context;
+ use crate::task::JoinHandle;
+ use crate::util::RngSeedGenerator;
+ use std::task::Waker;
+
+ impl Handle {
+ #[track_caller]
+ pub(crate) fn current() -> Handle {
+ match context::with_current(Clone::clone) {
+ Ok(handle) => handle,
+ Err(e) => panic!("{}", e),
+ }
+ }
+
+ pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
+ match self {
+ Handle::CurrentThread(h) => &h.blocking_spawner,
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(h) => &h.blocking_spawner,
+ }
+ }
+
+ pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match self {
+ Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
+ }
+ }
+
+ pub(crate) fn shutdown(&self) {
+ match *self {
+ Handle::CurrentThread(_) => {},
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(ref h) => h.shutdown(),
+ }
+ }
+
+ pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
+ match self {
+ Handle::CurrentThread(h) => &h.seed_generator,
+
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(h) => &h.seed_generator,
+ }
+ }
+
+ pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
+ match self {
+ Handle::CurrentThread(handle) => handle,
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ _ => panic!("not a CurrentThread handle"),
+ }
+ }
+ }
+
+ cfg_metrics! {
+ use crate::runtime::{SchedulerMetrics, WorkerMetrics};
+
+ impl Handle {
+ pub(crate) fn num_workers(&self) -> usize {
+ match self {
+ Handle::CurrentThread(_) => 1,
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.num_workers(),
+ }
+ }
+
+ pub(crate) fn num_blocking_threads(&self) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.num_blocking_threads(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.num_blocking_threads(),
+ }
+ }
+
+ pub(crate) fn num_idle_blocking_threads(&self) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.num_idle_blocking_threads(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.num_idle_blocking_threads(),
+ }
+ }
+
+ pub(crate) fn active_tasks_count(&self) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.active_tasks_count(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.active_tasks_count(),
+ }
+ }
+
+ pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
+ match self {
+ Handle::CurrentThread(handle) => handle.scheduler_metrics(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.scheduler_metrics(),
+ }
+ }
+
+ pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
+ match self {
+ Handle::CurrentThread(handle) => handle.worker_metrics(worker),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.worker_metrics(worker),
+ }
+ }
+
+ pub(crate) fn injection_queue_depth(&self) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.injection_queue_depth(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.injection_queue_depth(),
+ }
+ }
+
+ pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.worker_metrics(worker).queue_depth(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.worker_local_queue_depth(worker),
+ }
+ }
+
+ pub(crate) fn blocking_queue_depth(&self) -> usize {
+ match self {
+ Handle::CurrentThread(handle) => handle.blocking_queue_depth(),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Handle::MultiThread(handle) => handle.blocking_queue_depth(),
+ }
+ }
+ }
+ }
+
+ impl Context {
+ #[track_caller]
+ pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
+ match self {
+ Context::CurrentThread(context) => context,
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ _ => panic!("expected `CurrentThread::Context`")
+ }
+ }
+
+ pub(crate) fn defer(&self, waker: &Waker) {
+ match self {
+ Context::CurrentThread(context) => context.defer(waker),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ Context::MultiThread(context) => context.defer(waker),
+ }
+ }
+
+ cfg_rt_multi_thread! {
+ #[track_caller]
+ pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
+ match self {
+ Context::MultiThread(context) => context,
+ _ => panic!("expected `MultiThread::Context`")
+ }
+ }
+ }
+ }
+}
+
+cfg_not_rt! {
+ #[cfg(any(
+ feature = "net",
+ all(unix, feature = "process"),
+ all(unix, feature = "signal"),
+ feature = "time",
+ ))]
+ impl Handle {
+ #[track_caller]
+ pub(crate) fn current() -> Handle {
+ panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/counters.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/counters.rs
new file mode 100644
index 0000000000..50bcc11985
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/counters.rs
@@ -0,0 +1,62 @@
+#[cfg(tokio_internal_mt_counters)]
+mod imp {
+ use std::sync::atomic::AtomicUsize;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
+ static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
+ static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
+ static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
+ static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);
+
+ impl Drop for super::Counters {
+ fn drop(&mut self) {
+ let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
+ let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
+ let maintenance = NUM_MAINTENANCE.load(Relaxed);
+ let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
+ let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);
+
+ println!("---");
+ println!("notifies (local): {}", notifies_local);
+ println!(" unparks (local): {}", unparks_local);
+ println!(" maintenance: {}", maintenance);
+ println!(" LIFO schedules: {}", lifo_scheds);
+ println!(" LIFO capped: {}", lifo_capped);
+ }
+ }
+
+ pub(crate) fn inc_num_inc_notify_local() {
+ NUM_NOTIFY_LOCAL.fetch_add(1, Relaxed);
+ }
+
+ pub(crate) fn inc_num_unparks_local() {
+ NUM_UNPARKS_LOCAL.fetch_add(1, Relaxed);
+ }
+
+ pub(crate) fn inc_num_maintenance() {
+ NUM_MAINTENANCE.fetch_add(1, Relaxed);
+ }
+
+ pub(crate) fn inc_lifo_schedules() {
+ NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
+ }
+
+ pub(crate) fn inc_lifo_capped() {
+ NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
+ }
+}
+
+#[cfg(not(tokio_internal_mt_counters))]
+mod imp {
+ pub(crate) fn inc_num_inc_notify_local() {}
+ pub(crate) fn inc_num_unparks_local() {}
+ pub(crate) fn inc_num_maintenance() {}
+ pub(crate) fn inc_lifo_schedules() {}
+ pub(crate) fn inc_lifo_capped() {}
+}
+
+#[derive(Debug)]
+pub(crate) struct Counters;
+
+pub(super) use imp::*;
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle.rs
new file mode 100644
index 0000000000..98e4765856
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle.rs
@@ -0,0 +1,68 @@
+use crate::future::Future;
+use crate::loom::sync::Arc;
+use crate::runtime::scheduler::multi_thread::worker;
+use crate::runtime::{
+ blocking, driver,
+ task::{self, JoinHandle},
+};
+use crate::util::RngSeedGenerator;
+
+use std::fmt;
+
+cfg_metrics! {
+ mod metrics;
+}
+
+cfg_taskdump! {
+ mod taskdump;
+}
+
+/// Handle to the multi thread scheduler
+pub(crate) struct Handle {
+ /// Task spawner
+ pub(super) shared: worker::Shared,
+
+ /// Resource driver handles
+ pub(crate) driver: driver::Handle,
+
+ /// Blocking pool spawner
+ pub(crate) blocking_spawner: blocking::Spawner,
+
+ /// Current random number generator seed
+ pub(crate) seed_generator: RngSeedGenerator,
+}
+
+impl Handle {
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
+ where
+ F: crate::future::Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ Self::bind_new_task(me, future, id)
+ }
+
+ pub(crate) fn shutdown(&self) {
+ self.close();
+ }
+
+ pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
+ where
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
+
+ if let Some(notified) = notified {
+ me.schedule_task(notified, false);
+ }
+
+ handle
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("multi_thread::Handle { ... }").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
new file mode 100644
index 0000000000..838694fc89
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/metrics.rs
@@ -0,0 +1,41 @@
+use super::Handle;
+
+use crate::runtime::{SchedulerMetrics, WorkerMetrics};
+
+impl Handle {
+ pub(crate) fn num_workers(&self) -> usize {
+ self.shared.worker_metrics.len()
+ }
+
+ pub(crate) fn num_blocking_threads(&self) -> usize {
+ self.blocking_spawner.num_threads()
+ }
+
+ pub(crate) fn num_idle_blocking_threads(&self) -> usize {
+ self.blocking_spawner.num_idle_threads()
+ }
+
+ pub(crate) fn active_tasks_count(&self) -> usize {
+ self.shared.owned.active_tasks_count()
+ }
+
+ pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
+ &self.shared.scheduler_metrics
+ }
+
+ pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
+ &self.shared.worker_metrics[worker]
+ }
+
+ pub(crate) fn injection_queue_depth(&self) -> usize {
+ self.shared.injection_queue_depth()
+ }
+
+ pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
+ self.shared.worker_local_queue_depth(worker)
+ }
+
+ pub(crate) fn blocking_queue_depth(&self) -> usize {
+ self.blocking_spawner.queue_depth()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/taskdump.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/taskdump.rs
new file mode 100644
index 0000000000..477d857d88
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/handle/taskdump.rs
@@ -0,0 +1,26 @@
+use super::Handle;
+
+use crate::runtime::Dump;
+
+impl Handle {
+ pub(crate) async fn dump(&self) -> Dump {
+ let trace_status = &self.shared.trace_status;
+
+ // If a dump is in progress, block.
+ trace_status.start_trace_request(&self).await;
+
+ let result = loop {
+ if let Some(result) = trace_status.take_result() {
+ break result;
+ } else {
+ self.notify_all();
+ trace_status.result_ready.notified().await;
+ }
+ };
+
+ // Allow other queued dumps to proceed.
+ trace_status.end_trace_request(&self).await;
+
+ result
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/idle.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/idle.rs
new file mode 100644
index 0000000000..834bc2b66f
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/idle.rs
@@ -0,0 +1,240 @@
+//! Coordinates idling workers
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::runtime::scheduler::multi_thread::Shared;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{self, SeqCst};
+
+pub(super) struct Idle {
+ /// Tracks both the number of searching workers and the number of unparked
+ /// workers.
+ ///
+ /// Used as a fast-path to avoid acquiring the lock when needed.
+ state: AtomicUsize,
+
+ /// Total number of workers.
+ num_workers: usize,
+}
+
+/// Data synchronized by the scheduler mutex
+pub(super) struct Synced {
+ /// Sleeping workers
+ sleepers: Vec<usize>,
+}
+
+const UNPARK_SHIFT: usize = 16;
+const UNPARK_MASK: usize = !SEARCH_MASK;
+const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
+
+#[derive(Copy, Clone)]
+struct State(usize);
+
+impl Idle {
+ pub(super) fn new(num_workers: usize) -> (Idle, Synced) {
+ let init = State::new(num_workers);
+
+ let idle = Idle {
+ state: AtomicUsize::new(init.into()),
+ num_workers,
+ };
+
+ let synced = Synced {
+ sleepers: Vec::with_capacity(num_workers),
+ };
+
+ (idle, synced)
+ }
+
+ /// If there are no workers actively searching, returns the index of a
+ /// worker currently sleeping.
+ pub(super) fn worker_to_notify(&self, shared: &Shared) -> Option<usize> {
+ // If at least one worker is spinning, work being notified will
+ // eventually be found. A searching thread will find **some** work and
+ // notify another worker, eventually leading to our work being found.
+ //
+ // For this to happen, this load must happen before the thread
+ // transitioning `num_searching` to zero. Acquire / Release does not
+ // provide sufficient guarantees, so this load is done with `SeqCst` and
+ // will pair with the `fetch_sub(1)` when transitioning out of
+ // searching.
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // Acquire the lock
+ let mut lock = shared.synced.lock();
+
+ // Check again, now that the lock is acquired
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // A worker should be woken up, atomically increment the number of
+ // searching workers as well as the number of unparked workers.
+ State::unpark_one(&self.state, 1);
+
+ // Get the worker to unpark
+ let ret = lock.idle.sleepers.pop();
+ debug_assert!(ret.is_some());
+
+ ret
+ }
+
+ /// Returns `true` if the worker needs to do a final check for submitted
+ /// work.
+ pub(super) fn transition_worker_to_parked(
+ &self,
+ shared: &Shared,
+ worker: usize,
+ is_searching: bool,
+ ) -> bool {
+ // Acquire the lock
+ let mut lock = shared.synced.lock();
+
+ // Decrement the number of unparked threads
+ let ret = State::dec_num_unparked(&self.state, is_searching);
+
+ // Track the sleeping worker
+ lock.idle.sleepers.push(worker);
+
+ ret
+ }
+
+ pub(super) fn transition_worker_to_searching(&self) -> bool {
+ let state = State::load(&self.state, SeqCst);
+ if 2 * state.num_searching() >= self.num_workers {
+ return false;
+ }
+
+ // It is possible for this routine to allow more than 50% of the workers
+ // to search. That is OK. Limiting searchers is only an optimization to
+ // prevent too much contention.
+ State::inc_num_searching(&self.state, SeqCst);
+ true
+ }
+
+ /// A lightweight transition from searching -> running.
+ ///
+ /// Returns `true` if this is the final searching worker. The caller
+ /// **must** notify a new worker.
+ pub(super) fn transition_worker_from_searching(&self) -> bool {
+ State::dec_num_searching(&self.state)
+ }
+
+ /// Unpark a specific worker. This happens if tasks are submitted from
+ /// within the worker's park routine.
+ ///
+ /// Returns `true` if the worker was parked before calling the method.
+ pub(super) fn unpark_worker_by_id(&self, shared: &Shared, worker_id: usize) -> bool {
+ let mut lock = shared.synced.lock();
+ let sleepers = &mut lock.idle.sleepers;
+
+ for index in 0..sleepers.len() {
+ if sleepers[index] == worker_id {
+ sleepers.swap_remove(index);
+
+ // Update the state accordingly while the lock is held.
+ State::unpark_one(&self.state, 0);
+
+ return true;
+ }
+ }
+
+ false
+ }
+
+ /// Returns `true` if `worker_id` is contained in the sleep set.
+ pub(super) fn is_parked(&self, shared: &Shared, worker_id: usize) -> bool {
+ let lock = shared.synced.lock();
+ lock.idle.sleepers.contains(&worker_id)
+ }
+
+ fn notify_should_wakeup(&self) -> bool {
+ let state = State(self.state.fetch_add(0, SeqCst));
+ state.num_searching() == 0 && state.num_unparked() < self.num_workers
+ }
+}
+
+impl State {
+ fn new(num_workers: usize) -> State {
+ // All workers start in the unparked state
+ let ret = State(num_workers << UNPARK_SHIFT);
+ debug_assert_eq!(num_workers, ret.num_unparked());
+ debug_assert_eq!(0, ret.num_searching());
+ ret
+ }
+
+ fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
+ State(cell.load(ordering))
+ }
+
+ fn unpark_one(cell: &AtomicUsize, num_searching: usize) {
+ cell.fetch_add(num_searching | (1 << UNPARK_SHIFT), SeqCst);
+ }
+
+ fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
+ cell.fetch_add(1, ordering);
+ }
+
+ /// Returns `true` if this is the final searching worker
+ fn dec_num_searching(cell: &AtomicUsize) -> bool {
+ let state = State(cell.fetch_sub(1, SeqCst));
+ state.num_searching() == 1
+ }
+
+ /// Track a sleeping worker
+ ///
+ /// Returns `true` if this is the final searching worker.
+ fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
+ let mut dec = 1 << UNPARK_SHIFT;
+
+ if is_searching {
+ dec += 1;
+ }
+
+ let prev = State(cell.fetch_sub(dec, SeqCst));
+ is_searching && prev.num_searching() == 1
+ }
+
+ /// Number of workers currently searching
+ fn num_searching(self) -> usize {
+ self.0 & SEARCH_MASK
+ }
+
+ /// Number of workers currently unparked
+ fn num_unparked(self) -> usize {
+ (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("num_unparked", &self.num_unparked())
+ .field("num_searching", &self.num_searching())
+ .finish()
+ }
+}
+
+#[test]
+fn test_state() {
+ assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
+ assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
+
+ let state = State::new(10);
+ assert_eq!(10, state.num_unparked());
+ assert_eq!(0, state.num_searching());
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/mod.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/mod.rs
new file mode 100644
index 0000000000..d85a0ae0a2
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/mod.rs
@@ -0,0 +1,103 @@
+//! Multi-threaded runtime
+
+mod counters;
+use counters::Counters;
+
+mod handle;
+pub(crate) use handle::Handle;
+
+mod overflow;
+pub(crate) use overflow::Overflow;
+
+mod idle;
+use self::idle::Idle;
+
+mod stats;
+pub(crate) use stats::Stats;
+
+mod park;
+pub(crate) use park::{Parker, Unparker};
+
+pub(crate) mod queue;
+
+mod worker;
+pub(crate) use worker::{Context, Launch, Shared};
+
+cfg_taskdump! {
+ mod trace;
+ use trace::TraceStatus;
+
+ pub(crate) use worker::Synced;
+}
+
+cfg_not_taskdump! {
+ mod trace_mock;
+ use trace_mock::TraceStatus;
+}
+
+pub(crate) use worker::block_in_place;
+
+use crate::loom::sync::Arc;
+use crate::runtime::{
+ blocking,
+ driver::{self, Driver},
+ scheduler, Config,
+};
+use crate::util::RngSeedGenerator;
+
+use std::fmt;
+use std::future::Future;
+
+/// Work-stealing based thread pool for executing futures.
+pub(crate) struct MultiThread;
+
+// ===== impl MultiThread =====
+
+impl MultiThread {
+ pub(crate) fn new(
+ size: usize,
+ driver: Driver,
+ driver_handle: driver::Handle,
+ blocking_spawner: blocking::Spawner,
+ seed_generator: RngSeedGenerator,
+ config: Config,
+ ) -> (MultiThread, Arc<Handle>, Launch) {
+ let parker = Parker::new(driver);
+ let (handle, launch) = worker::create(
+ size,
+ parker,
+ driver_handle,
+ blocking_spawner,
+ seed_generator,
+ config,
+ );
+
+ (MultiThread, handle, launch)
+ }
+
+ /// Blocks the current thread waiting for the future to complete.
+ ///
+ /// The future will execute on the current thread, but all spawned tasks
+ /// will be executed on the thread pool.
+ pub(crate) fn block_on<F>(&self, handle: &scheduler::Handle, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ crate::runtime::context::enter_runtime(handle, true, |blocking| {
+ blocking.block_on(future).expect("failed to park thread")
+ })
+ }
+
+ pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
+ match handle {
+ scheduler::Handle::MultiThread(handle) => handle.shutdown(),
+ _ => panic!("expected MultiThread scheduler"),
+ }
+ }
+}
+
+impl fmt::Debug for MultiThread {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("MultiThread").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/overflow.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/overflow.rs
new file mode 100644
index 0000000000..ab664811cf
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/overflow.rs
@@ -0,0 +1,26 @@
+use crate::runtime::task;
+
+#[cfg(test)]
+use std::cell::RefCell;
+
+pub(crate) trait Overflow<T: 'static> {
+ fn push(&self, task: task::Notified<T>);
+
+ fn push_batch<I>(&self, iter: I)
+ where
+ I: Iterator<Item = task::Notified<T>>;
+}
+
+#[cfg(test)]
+impl<T: 'static> Overflow<T> for RefCell<Vec<task::Notified<T>>> {
+ fn push(&self, task: task::Notified<T>) {
+ self.borrow_mut().push(task);
+ }
+
+ fn push_batch<I>(&self, iter: I)
+ where
+ I: Iterator<Item = task::Notified<T>>,
+ {
+ self.borrow_mut().extend(iter);
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/park.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/park.rs
new file mode 100644
index 0000000000..0a00ea004e
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/park.rs
@@ -0,0 +1,232 @@
+//! Parks the runtime.
+//!
+//! A combination of the various resource driver park handles.
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::runtime::driver::{self, Driver};
+use crate::util::TryLock;
+
+use std::sync::atomic::Ordering::SeqCst;
+use std::time::Duration;
+
+pub(crate) struct Parker {
+ inner: Arc<Inner>,
+}
+
+pub(crate) struct Unparker {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// Avoids entering the park if possible
+ state: AtomicUsize,
+
+ /// Used to coordinate access to the driver / condvar
+ mutex: Mutex<()>,
+
+ /// Condvar to block on if the driver is unavailable.
+ condvar: Condvar,
+
+ /// Resource (I/O, time, ...) driver
+ shared: Arc<Shared>,
+}
+
+const EMPTY: usize = 0;
+const PARKED_CONDVAR: usize = 1;
+const PARKED_DRIVER: usize = 2;
+const NOTIFIED: usize = 3;
+
+/// Shared across multiple Parker handles
+struct Shared {
+ /// Shared driver. Only one thread at a time can use this
+ driver: TryLock<Driver>,
+}
+
+impl Parker {
+ pub(crate) fn new(driver: Driver) -> Parker {
+ Parker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ shared: Arc::new(Shared {
+ driver: TryLock::new(driver),
+ }),
+ }),
+ }
+ }
+
+ pub(crate) fn unpark(&self) -> Unparker {
+ Unparker {
+ inner: self.inner.clone(),
+ }
+ }
+
+ pub(crate) fn park(&mut self, handle: &driver::Handle) {
+ self.inner.park(handle);
+ }
+
+ pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
+ // Only parking with zero is supported...
+ assert_eq!(duration, Duration::from_millis(0));
+
+ if let Some(mut driver) = self.inner.shared.driver.try_lock() {
+ driver.park_timeout(handle, duration)
+ }
+ }
+
+ pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
+ self.inner.shutdown(handle);
+ }
+}
+
+impl Clone for Parker {
+ fn clone(&self) -> Parker {
+ Parker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ shared: self.inner.shared.clone(),
+ }),
+ }
+ }
+}
+
+impl Unparker {
+ pub(crate) fn unpark(&self, driver: &driver::Handle) {
+ self.inner.unpark(driver);
+ }
+}
+
+impl Inner {
+ /// Parks the current thread for at most `dur`.
+ fn park(&self, handle: &driver::Handle) {
+ // If we were previously notified then we consume this notification and
+ // return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
+ }
+
+ if let Some(mut driver) = self.shared.driver.try_lock() {
+ self.park_driver(&mut driver, handle);
+ } else {
+ self.park_condvar();
+ }
+ }
+
+ fn park_condvar(&self) {
+ // Otherwise we need to coordinate going to sleep
+ let mut m = self.mutex.lock();
+
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
+ {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+
+ return;
+ }
+ Err(actual) => panic!("inconsistent park state; actual = {}", actual),
+ }
+
+ loop {
+ m = self.condvar.wait(m).unwrap();
+
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
+ }
+
+ // spurious wakeup, go back to sleep
+ }
+ }
+
+ fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
+ {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+
+ return;
+ }
+ Err(actual) => panic!("inconsistent park state; actual = {}", actual),
+ }
+
+ driver.park(handle);
+
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification, hurray!
+ PARKED_DRIVER => {} // no notification, alas
+ n => panic!("inconsistent park_timeout state: {}", n),
+ }
+ }
+
+ fn unpark(&self, driver: &driver::Handle) {
+ // To ensure the unparked thread will observe any writes we made before
+ // this call, we must perform a release operation that `park` can
+ // synchronize with. To do that we must write `NOTIFIED` even if `state`
+ // is already `NOTIFIED`. That is why this must be a swap rather than a
+ // compare-and-swap that returns if it reads `NOTIFIED` on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => {} // no one was waiting
+ NOTIFIED => {} // already unparked
+ PARKED_CONDVAR => self.unpark_condvar(),
+ PARKED_DRIVER => driver.unpark(),
+ actual => panic!("inconsistent state in unpark; actual = {}", actual),
+ }
+ }
+
+ fn unpark_condvar(&self) {
+ // There is a period between when the parked thread sets `state` to
+ // `PARKED` (or last checked `state` in the case of a spurious wake
+ // up) and when it actually waits on `cvar`. If we were to notify
+ // during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has
+ // `lock` locked at this stage so we can acquire `lock` to wait until
+ // it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the
+ // parked thread wakes it doesn't get woken only to have to wait for us
+ // to release `lock`.
+ drop(self.mutex.lock());
+
+ self.condvar.notify_one()
+ }
+
+ fn shutdown(&self, handle: &driver::Handle) {
+ if let Some(mut driver) = self.shared.driver.try_lock() {
+ driver.shutdown(handle);
+ }
+
+ self.condvar.notify_all();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/queue.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/queue.rs
new file mode 100644
index 0000000000..dd66fa2dde
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/queue.rs
@@ -0,0 +1,608 @@
+//! Run-queue structures to support a work-stealing scheduler
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::Arc;
+use crate::runtime::scheduler::multi_thread::{Overflow, Stats};
+use crate::runtime::task;
+
+use std::mem::{self, MaybeUninit};
+use std::ptr;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+
+// Use wider integers when possible to increase ABA resilience.
+//
+// See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
+cfg_has_atomic_u64! {
+ type UnsignedShort = u32;
+ type UnsignedLong = u64;
+ type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
+ type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
+}
+cfg_not_has_atomic_u64! {
+ type UnsignedShort = u16;
+ type UnsignedLong = u32;
+ type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
+ type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
+}
+
+/// Producer handle. May only be used from a single thread.
+pub(crate) struct Local<T: 'static> {
+ inner: Arc<Inner<T>>,
+}
+
+/// Consumer handle. May be used from many threads.
+pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
+
+pub(crate) struct Inner<T: 'static> {
+ /// Concurrently updated by many threads.
+ ///
+ /// Contains two `UnsignedShort` values. The LSB byte is the "real" head of
+ /// the queue. The `UnsignedShort` in the MSB is set by a stealer in process
+ /// of stealing values. It represents the first value being stolen in the
+ /// batch. The `UnsignedShort` indices are intentionally wider than strictly
+ /// required for buffer indexing in order to provide ABA mitigation and make
+ /// it possible to distinguish between full and empty buffers.
+ ///
+ /// When both `UnsignedShort` values are the same, there is no active
+ /// stealer.
+ ///
+ /// Tracking an in-progress stealer prevents a wrapping scenario.
+ head: AtomicUnsignedLong,
+
+ /// Only updated by producer thread but read by many threads.
+ tail: AtomicUnsignedShort,
+
+ /// Elements
+ buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
+}
+
+unsafe impl<T> Send for Inner<T> {}
+unsafe impl<T> Sync for Inner<T> {}
+
+#[cfg(not(loom))]
+const LOCAL_QUEUE_CAPACITY: usize = 256;
+
+// Shrink the size of the local queue when using loom. This shouldn't impact
+// logic, but allows loom to test more edge cases in a reasonable a mount of
+// time.
+#[cfg(loom)]
+const LOCAL_QUEUE_CAPACITY: usize = 4;
+
+const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
+
+// Constructing the fixed size array directly is very awkward. The only way to
+// do it is to repeat `UnsafeCell::new(MaybeUninit::uninit())` 256 times, as
+// the contents are not Copy. The trick with defining a const doesn't work for
+// generic types.
+fn make_fixed_size<T>(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> {
+ assert_eq!(buffer.len(), LOCAL_QUEUE_CAPACITY);
+
+ // safety: We check that the length is correct.
+ unsafe { Box::from_raw(Box::into_raw(buffer).cast()) }
+}
+
+/// Create a new local run-queue
+pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
+ let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
+
+ for _ in 0..LOCAL_QUEUE_CAPACITY {
+ buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
+ }
+
+ let inner = Arc::new(Inner {
+ head: AtomicUnsignedLong::new(0),
+ tail: AtomicUnsignedShort::new(0),
+ buffer: make_fixed_size(buffer.into_boxed_slice()),
+ });
+
+ let local = Local {
+ inner: inner.clone(),
+ };
+
+ let remote = Steal(inner);
+
+ (remote, local)
+}
+
+impl<T> Local<T> {
+ /// Returns the number of entries in the queue
+ pub(crate) fn len(&self) -> usize {
+ self.inner.len() as usize
+ }
+
+ /// How many tasks can be pushed into the queue
+ pub(crate) fn remaining_slots(&self) -> usize {
+ self.inner.remaining_slots()
+ }
+
+ pub(crate) fn max_capacity(&self) -> usize {
+ LOCAL_QUEUE_CAPACITY
+ }
+
+ /// Returns false if there are any entries in the queue
+ ///
+ /// Separate to is_stealable so that refactors of is_stealable to "protect"
+ /// some tasks from stealing won't affect this
+ pub(crate) fn has_tasks(&self) -> bool {
+ !self.inner.is_empty()
+ }
+
+ /// Pushes a batch of tasks to the back of the queue. All tasks must fit in
+ /// the local queue.
+ ///
+ /// # Panics
+ ///
+ /// The method panics if there is not enough capacity to fit in the queue.
+ pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
+ let len = tasks.len();
+ assert!(len <= LOCAL_QUEUE_CAPACITY);
+
+ if len == 0 {
+ // Nothing to do
+ return;
+ }
+
+ let head = self.inner.head.load(Acquire);
+ let (steal, _) = unpack(head);
+
+ // safety: this is the **only** thread that updates this cell.
+ let mut tail = unsafe { self.inner.tail.unsync_load() };
+
+ if tail.wrapping_sub(steal) <= (LOCAL_QUEUE_CAPACITY - len) as UnsignedShort {
+ // Yes, this if condition is structured a bit weird (first block
+ // does nothing, second returns an error). It is this way to match
+ // `push_back_or_overflow`.
+ } else {
+ panic!()
+ }
+
+ for task in tasks {
+ let idx = tail as usize & MASK;
+
+ self.inner.buffer[idx].with_mut(|ptr| {
+ // Write the task to the slot
+ //
+ // Safety: There is only one producer and the above `if`
+ // condition ensures we don't touch a cell if there is a
+ // value, thus no consumer.
+ unsafe {
+ ptr::write((*ptr).as_mut_ptr(), task);
+ }
+ });
+
+ tail = tail.wrapping_add(1);
+ }
+
+ self.inner.tail.store(tail, Release);
+ }
+
+ /// Pushes a task to the back of the local queue, if there is not enough
+ /// capacity in the queue, this triggers the overflow operation.
+ ///
+ /// When the queue overflows, half of the curent contents of the queue is
+ /// moved to the given Injection queue. This frees up capacity for more
+ /// tasks to be pushed into the local queue.
+ pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
+ &mut self,
+ mut task: task::Notified<T>,
+ overflow: &O,
+ stats: &mut Stats,
+ ) {
+ let tail = loop {
+ let head = self.inner.head.load(Acquire);
+ let (steal, real) = unpack(head);
+
+ // safety: this is the **only** thread that updates this cell.
+ let tail = unsafe { self.inner.tail.unsync_load() };
+
+ if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as UnsignedShort {
+ // There is capacity for the task
+ break tail;
+ } else if steal != real {
+ // Concurrently stealing, this will free up capacity, so only
+ // push the task onto the inject queue
+ overflow.push(task);
+ return;
+ } else {
+ // Push the current task and half of the queue into the
+ // inject queue.
+ match self.push_overflow(task, real, tail, overflow, stats) {
+ Ok(_) => return,
+ // Lost the race, try again
+ Err(v) => {
+ task = v;
+ }
+ }
+ }
+ };
+
+ self.push_back_finish(task, tail);
+ }
+
+ // Second half of `push_back`
+ fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
+ // Map the position to a slot index.
+ let idx = tail as usize & MASK;
+
+ self.inner.buffer[idx].with_mut(|ptr| {
+ // Write the task to the slot
+ //
+ // Safety: There is only one producer and the above `if`
+ // condition ensures we don't touch a cell if there is a
+ // value, thus no consumer.
+ unsafe {
+ ptr::write((*ptr).as_mut_ptr(), task);
+ }
+ });
+
+ // Make the task available. Synchronizes with a load in
+ // `steal_into2`.
+ self.inner.tail.store(tail.wrapping_add(1), Release);
+ }
+
+ /// Moves a batch of tasks into the inject queue.
+ ///
+ /// This will temporarily make some of the tasks unavailable to stealers.
+ /// Once `push_overflow` is done, a notification is sent out, so if other
+ /// workers "missed" some of the tasks during a steal, they will get
+ /// another opportunity.
+ #[inline(never)]
+ fn push_overflow<O: Overflow<T>>(
+ &mut self,
+ task: task::Notified<T>,
+ head: UnsignedShort,
+ tail: UnsignedShort,
+ overflow: &O,
+ stats: &mut Stats,
+ ) -> Result<(), task::Notified<T>> {
+ /// How many elements are we taking from the local queue.
+ ///
+ /// This is one less than the number of tasks pushed to the inject
+ /// queue as we are also inserting the `task` argument.
+ const NUM_TASKS_TAKEN: UnsignedShort = (LOCAL_QUEUE_CAPACITY / 2) as UnsignedShort;
+
+ assert_eq!(
+ tail.wrapping_sub(head) as usize,
+ LOCAL_QUEUE_CAPACITY,
+ "queue is not full; tail = {}; head = {}",
+ tail,
+ head
+ );
+
+ let prev = pack(head, head);
+
+ // Claim a bunch of tasks
+ //
+ // We are claiming the tasks **before** reading them out of the buffer.
+ // This is safe because only the **current** thread is able to push new
+ // tasks.
+ //
+ // There isn't really any need for memory ordering... Relaxed would
+ // work. This is because all tasks are pushed into the queue from the
+ // current thread (or memory has been acquired if the local queue handle
+ // moved).
+ if self
+ .inner
+ .head
+ .compare_exchange(
+ prev,
+ pack(
+ head.wrapping_add(NUM_TASKS_TAKEN),
+ head.wrapping_add(NUM_TASKS_TAKEN),
+ ),
+ Release,
+ Relaxed,
+ )
+ .is_err()
+ {
+ // We failed to claim the tasks, losing the race. Return out of
+ // this function and try the full `push` routine again. The queue
+ // may not be full anymore.
+ return Err(task);
+ }
+
+ /// An iterator that takes elements out of the run queue.
+ struct BatchTaskIter<'a, T: 'static> {
+ buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY],
+ head: UnsignedLong,
+ i: UnsignedLong,
+ }
+ impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
+ type Item = task::Notified<T>;
+
+ #[inline]
+ fn next(&mut self) -> Option<task::Notified<T>> {
+ if self.i == UnsignedLong::from(NUM_TASKS_TAKEN) {
+ None
+ } else {
+ let i_idx = self.i.wrapping_add(self.head) as usize & MASK;
+ let slot = &self.buffer[i_idx];
+
+ // safety: Our CAS from before has assumed exclusive ownership
+ // of the task pointers in this range.
+ let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ self.i += 1;
+ Some(task)
+ }
+ }
+ }
+
+ // safety: The CAS above ensures that no consumer will look at these
+ // values again, and we are the only producer.
+ let batch_iter = BatchTaskIter {
+ buffer: &self.inner.buffer,
+ head: head as UnsignedLong,
+ i: 0,
+ };
+ overflow.push_batch(batch_iter.chain(std::iter::once(task)));
+
+ // Add 1 to factor in the task currently being scheduled.
+ stats.incr_overflow_count();
+
+ Ok(())
+ }
+
+ /// Pops a task from the local queue.
+ pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
+ let mut head = self.inner.head.load(Acquire);
+
+ let idx = loop {
+ let (steal, real) = unpack(head);
+
+ // safety: this is the **only** thread that updates this cell.
+ let tail = unsafe { self.inner.tail.unsync_load() };
+
+ if real == tail {
+ // queue is empty
+ return None;
+ }
+
+ let next_real = real.wrapping_add(1);
+
+ // If `steal == real` there are no concurrent stealers. Both `steal`
+ // and `real` are updated.
+ let next = if steal == real {
+ pack(next_real, next_real)
+ } else {
+ assert_ne!(steal, next_real);
+ pack(steal, next_real)
+ };
+
+ // Attempt to claim a task.
+ let res = self
+ .inner
+ .head
+ .compare_exchange(head, next, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => break real as usize & MASK,
+ Err(actual) => head = actual,
+ }
+ };
+
+ Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
+ }
+}
+
+impl<T> Steal<T> {
+ pub(crate) fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// Steals half the tasks from self and place them into `dst`.
+ pub(crate) fn steal_into(
+ &self,
+ dst: &mut Local<T>,
+ dst_stats: &mut Stats,
+ ) -> Option<task::Notified<T>> {
+ // Safety: the caller is the only thread that mutates `dst.tail` and
+ // holds a mutable reference.
+ let dst_tail = unsafe { dst.inner.tail.unsync_load() };
+
+ // To the caller, `dst` may **look** empty but still have values
+ // contained in the buffer. If another thread is concurrently stealing
+ // from `dst` there may not be enough capacity to steal.
+ let (steal, _) = unpack(dst.inner.head.load(Acquire));
+
+ if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as UnsignedShort / 2 {
+ // we *could* try to steal less here, but for simplicity, we're just
+ // going to abort.
+ return None;
+ }
+
+ // Steal the tasks into `dst`'s buffer. This does not yet expose the
+ // tasks in `dst`.
+ let mut n = self.steal_into2(dst, dst_tail);
+
+ if n == 0 {
+ // No tasks were stolen
+ return None;
+ }
+
+ dst_stats.incr_steal_count(n as u16);
+ dst_stats.incr_steal_operations();
+
+ // We are returning a task here
+ n -= 1;
+
+ let ret_pos = dst_tail.wrapping_add(n);
+ let ret_idx = ret_pos as usize & MASK;
+
+ // safety: the value was written as part of `steal_into2` and not
+ // exposed to stealers, so no other thread can access it.
+ let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ if n == 0 {
+ // The `dst` queue is empty, but a single task was stolen
+ return Some(ret);
+ }
+
+ // Make the stolen items available to consumers
+ dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
+
+ Some(ret)
+ }
+
+ // Steal tasks from `self`, placing them into `dst`. Returns the number of
+ // tasks that were stolen.
+ fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
+ let mut prev_packed = self.0.head.load(Acquire);
+ let mut next_packed;
+
+ let n = loop {
+ let (src_head_steal, src_head_real) = unpack(prev_packed);
+ let src_tail = self.0.tail.load(Acquire);
+
+ // If these two do not match, another thread is concurrently
+ // stealing from the queue.
+ if src_head_steal != src_head_real {
+ return 0;
+ }
+
+ // Number of available tasks to steal
+ let n = src_tail.wrapping_sub(src_head_real);
+ let n = n - n / 2;
+
+ if n == 0 {
+ // No tasks available to steal
+ return 0;
+ }
+
+ // Update the real head index to acquire the tasks.
+ let steal_to = src_head_real.wrapping_add(n);
+ assert_ne!(src_head_steal, steal_to);
+ next_packed = pack(src_head_steal, steal_to);
+
+ // Claim all those tasks. This is done by incrementing the "real"
+ // head but not the steal. By doing this, no other thread is able to
+ // steal from this queue until the current thread completes.
+ let res = self
+ .0
+ .head
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => break n,
+ Err(actual) => prev_packed = actual,
+ }
+ };
+
+ assert!(
+ n <= LOCAL_QUEUE_CAPACITY as UnsignedShort / 2,
+ "actual = {}",
+ n
+ );
+
+ let (first, _) = unpack(next_packed);
+
+ // Take all the tasks
+ for i in 0..n {
+ // Compute the positions
+ let src_pos = first.wrapping_add(i);
+ let dst_pos = dst_tail.wrapping_add(i);
+
+ // Map to slots
+ let src_idx = src_pos as usize & MASK;
+ let dst_idx = dst_pos as usize & MASK;
+
+ // Read the task
+ //
+ // safety: We acquired the task with the atomic exchange above.
+ let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ // Write the task to the new slot
+ //
+ // safety: `dst` queue is empty and we are the only producer to
+ // this queue.
+ dst.inner.buffer[dst_idx]
+ .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
+ }
+
+ let mut prev_packed = next_packed;
+
+ // Update `src_head_steal` to match `src_head_real` signalling that the
+ // stealing routine is complete.
+ loop {
+ let head = unpack(prev_packed).1;
+ next_packed = pack(head, head);
+
+ let res = self
+ .0
+ .head
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return n,
+ Err(actual) => {
+ let (actual_steal, actual_real) = unpack(actual);
+
+ assert_ne!(actual_steal, actual_real);
+
+ prev_packed = actual;
+ }
+ }
+ }
+ }
+}
+
+cfg_metrics! {
+ impl<T> Steal<T> {
+ pub(crate) fn len(&self) -> usize {
+ self.0.len() as _
+ }
+ }
+}
+
+impl<T> Clone for Steal<T> {
+ fn clone(&self) -> Steal<T> {
+ Steal(self.0.clone())
+ }
+}
+
+impl<T> Drop for Local<T> {
+ fn drop(&mut self) {
+ if !std::thread::panicking() {
+ assert!(self.pop().is_none(), "queue not empty");
+ }
+ }
+}
+
+impl<T> Inner<T> {
+ fn remaining_slots(&self) -> usize {
+ let (steal, _) = unpack(self.head.load(Acquire));
+ let tail = self.tail.load(Acquire);
+
+ LOCAL_QUEUE_CAPACITY - (tail.wrapping_sub(steal) as usize)
+ }
+
+ fn len(&self) -> UnsignedShort {
+ let (_, head) = unpack(self.head.load(Acquire));
+ let tail = self.tail.load(Acquire);
+
+ tail.wrapping_sub(head)
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+}
+
+/// Split the head value into the real head and the index a stealer is working
+/// on.
+fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
+ let real = n & UnsignedShort::MAX as UnsignedLong;
+ let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
+
+ (steal as UnsignedShort, real as UnsignedShort)
+}
+
+/// Join the two head values
+fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
+ (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
+}
+
+#[test]
+fn test_local_queue_capacity() {
+ assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::MAX as usize);
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/stats.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/stats.rs
new file mode 100644
index 0000000000..f01daaa1bf
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/stats.rs
@@ -0,0 +1,140 @@
+use crate::runtime::{Config, MetricsBatch, WorkerMetrics};
+
+use std::cmp;
+use std::time::{Duration, Instant};
+
+/// Per-worker statistics. This is used for both tuning the scheduler and
+/// reporting runtime-level metrics/stats.
+pub(crate) struct Stats {
+ /// The metrics batch used to report runtime-level metrics/stats to the
+ /// user.
+ batch: MetricsBatch,
+
+ /// Instant at which work last resumed (continued after park).
+ ///
+ /// This duplicates the value stored in `MetricsBatch`. We will unify
+ /// `Stats` and `MetricsBatch` when we stabilize metrics.
+ processing_scheduled_tasks_started_at: Instant,
+
+ /// Number of tasks polled in the batch of scheduled tasks
+ tasks_polled_in_batch: usize,
+
+ /// Exponentially-weighted moving average of time spent polling scheduled a
+ /// task.
+ ///
+ /// Tracked in nanoseconds, stored as a f64 since that is what we use with
+ /// the EWMA calculations
+ task_poll_time_ewma: f64,
+}
+
+/// How to weigh each individual poll time, value is plucked from thin air.
+const TASK_POLL_TIME_EWMA_ALPHA: f64 = 0.1;
+
+/// Ideally, we wouldn't go above this, value is plucked from thin air.
+const TARGET_GLOBAL_QUEUE_INTERVAL: f64 = Duration::from_micros(200).as_nanos() as f64;
+
+/// Max value for the global queue interval. This is 2x the previous default
+const MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 127;
+
+/// This is the previous default
+const TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL: u32 = 61;
+
+impl Stats {
+ pub(crate) fn new(worker_metrics: &WorkerMetrics) -> Stats {
+ // Seed the value with what we hope to see.
+ let task_poll_time_ewma =
+ TARGET_GLOBAL_QUEUE_INTERVAL / TARGET_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL as f64;
+
+ Stats {
+ batch: MetricsBatch::new(worker_metrics),
+ processing_scheduled_tasks_started_at: Instant::now(),
+ tasks_polled_in_batch: 0,
+ task_poll_time_ewma,
+ }
+ }
+
+ pub(crate) fn tuned_global_queue_interval(&self, config: &Config) -> u32 {
+ // If an interval is explicitly set, don't tune.
+ if let Some(configured) = config.global_queue_interval {
+ return configured;
+ }
+
+ // As of Rust 1.45, casts from f64 -> u32 are saturating, which is fine here.
+ let tasks_per_interval = (TARGET_GLOBAL_QUEUE_INTERVAL / self.task_poll_time_ewma) as u32;
+
+ cmp::max(
+ // We don't want to return less than 2 as that would result in the
+ // global queue always getting checked first.
+ 2,
+ cmp::min(
+ MAX_TASKS_POLLED_PER_GLOBAL_QUEUE_INTERVAL,
+ tasks_per_interval,
+ ),
+ )
+ }
+
+ pub(crate) fn submit(&mut self, to: &WorkerMetrics) {
+ self.batch.submit(to);
+ }
+
+ pub(crate) fn about_to_park(&mut self) {
+ self.batch.about_to_park();
+ }
+
+ pub(crate) fn inc_local_schedule_count(&mut self) {
+ self.batch.inc_local_schedule_count();
+ }
+
+ pub(crate) fn start_processing_scheduled_tasks(&mut self) {
+ self.batch.start_processing_scheduled_tasks();
+
+ self.processing_scheduled_tasks_started_at = Instant::now();
+ self.tasks_polled_in_batch = 0;
+ }
+
+ pub(crate) fn end_processing_scheduled_tasks(&mut self) {
+ self.batch.end_processing_scheduled_tasks();
+
+ // Update the EWMA task poll time
+ if self.tasks_polled_in_batch > 0 {
+ let now = Instant::now();
+
+ // If we "overflow" this conversion, we have bigger problems than
+ // slightly off stats.
+ let elapsed = (now - self.processing_scheduled_tasks_started_at).as_nanos() as f64;
+ let num_polls = self.tasks_polled_in_batch as f64;
+
+ // Calculate the mean poll duration for a single task in the batch
+ let mean_poll_duration = elapsed / num_polls;
+
+ // Compute the alpha weighted by the number of tasks polled this batch.
+ let weighted_alpha = 1.0 - (1.0 - TASK_POLL_TIME_EWMA_ALPHA).powf(num_polls);
+
+ // Now compute the new weighted average task poll time.
+ self.task_poll_time_ewma = weighted_alpha * mean_poll_duration
+ + (1.0 - weighted_alpha) * self.task_poll_time_ewma;
+ }
+ }
+
+ pub(crate) fn start_poll(&mut self) {
+ self.batch.start_poll();
+
+ self.tasks_polled_in_batch += 1;
+ }
+
+ pub(crate) fn end_poll(&mut self) {
+ self.batch.end_poll();
+ }
+
+ pub(crate) fn incr_steal_count(&mut self, by: u16) {
+ self.batch.incr_steal_count(by);
+ }
+
+ pub(crate) fn incr_steal_operations(&mut self) {
+ self.batch.incr_steal_operations();
+ }
+
+ pub(crate) fn incr_overflow_count(&mut self) {
+ self.batch.incr_overflow_count();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace.rs
new file mode 100644
index 0000000000..7b4aeb5c1d
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace.rs
@@ -0,0 +1,61 @@
+use crate::loom::sync::atomic::{AtomicBool, Ordering};
+use crate::loom::sync::{Barrier, Mutex};
+use crate::runtime::dump::Dump;
+use crate::runtime::scheduler::multi_thread::Handle;
+use crate::sync::notify::Notify;
+
+/// Tracing status of the worker.
+pub(super) struct TraceStatus {
+ pub(super) trace_requested: AtomicBool,
+ pub(super) trace_start: Barrier,
+ pub(super) trace_end: Barrier,
+ pub(super) result_ready: Notify,
+ pub(super) trace_result: Mutex<Option<Dump>>,
+}
+
+impl TraceStatus {
+ pub(super) fn new(remotes_len: usize) -> Self {
+ Self {
+ trace_requested: AtomicBool::new(false),
+ trace_start: Barrier::new(remotes_len),
+ trace_end: Barrier::new(remotes_len),
+ result_ready: Notify::new(),
+ trace_result: Mutex::new(None),
+ }
+ }
+
+ pub(super) fn trace_requested(&self) -> bool {
+ self.trace_requested.load(Ordering::Relaxed)
+ }
+
+ pub(super) async fn start_trace_request(&self, handle: &Handle) {
+ while self
+ .trace_requested
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ handle.notify_all();
+ crate::task::yield_now().await;
+ }
+ }
+
+ pub(super) fn stash_result(&self, dump: Dump) {
+ let _ = self.trace_result.lock().insert(dump);
+ self.result_ready.notify_one();
+ }
+
+ pub(super) fn take_result(&self) -> Option<Dump> {
+ self.trace_result.lock().take()
+ }
+
+ pub(super) async fn end_trace_request(&self, handle: &Handle) {
+ while self
+ .trace_requested
+ .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ handle.notify_all();
+ crate::task::yield_now().await;
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace_mock.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace_mock.rs
new file mode 100644
index 0000000000..2c17a4e38b
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/trace_mock.rs
@@ -0,0 +1,11 @@
+pub(super) struct TraceStatus {}
+
+impl TraceStatus {
+ pub(super) fn new(_: usize) -> Self {
+ Self {}
+ }
+
+ pub(super) fn trace_requested(&self) -> bool {
+ false
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker.rs
new file mode 100644
index 0000000000..6ae1146337
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker.rs
@@ -0,0 +1,1216 @@
+//! A scheduler is initialized with a fixed number of workers. Each worker is
+//! driven by a thread. Each worker has a "core" which contains data such as the
+//! run queue and other state. When `block_in_place` is called, the worker's
+//! "core" is handed off to a new thread allowing the scheduler to continue to
+//! make progress while the originating thread blocks.
+//!
+//! # Shutdown
+//!
+//! Shutting down the runtime involves the following steps:
+//!
+//! 1. The Shared::close method is called. This closes the inject queue and
+//! OwnedTasks instance and wakes up all worker threads.
+//!
+//! 2. Each worker thread observes the close signal next time it runs
+//! Core::maintenance by checking whether the inject queue is closed.
+//! The Core::is_shutdown flag is set to true.
+//!
+//! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
+//! will keep removing tasks from OwnedTasks until it is empty. No new
+//! tasks can be pushed to the OwnedTasks during or after this step as it
+//! was closed in step 1.
+//!
+//! 5. The workers call Shared::shutdown to enter the single-threaded phase of
+//! shutdown. These calls will push their core to Shared::shutdown_cores,
+//! and the last thread to push its core will finish the shutdown procedure.
+//!
+//! 6. The local run queue of each core is emptied, then the inject queue is
+//! emptied.
+//!
+//! At this point, shutdown has completed. It is not possible for any of the
+//! collections to contain any tasks at this point, as each collection was
+//! closed first, then emptied afterwards.
+//!
+//! ## Spawns during shutdown
+//!
+//! When spawning tasks during shutdown, there are two cases:
+//!
+//! * The spawner observes the OwnedTasks being open, and the inject queue is
+//! closed.
+//! * The spawner observes the OwnedTasks being closed and doesn't check the
+//! inject queue.
+//!
+//! The first case can only happen if the OwnedTasks::bind call happens before
+//! or during step 1 of shutdown. In this case, the runtime will clean up the
+//! task in step 3 of shutdown.
+//!
+//! In the latter case, the task was not spawned and the task is immediately
+//! cancelled by the spawner.
+//!
+//! The correctness of shutdown requires both the inject queue and OwnedTasks
+//! collection to have a closed bit. With a close bit on only the inject queue,
+//! spawning could run in to a situation where a task is successfully bound long
+//! after the runtime has shut down. With a close bit on only the OwnedTasks,
+//! the first spawning situation could result in the notification being pushed
+//! to the inject queue after step 6 of shutdown, which would leave a task in
+//! the inject queue indefinitely. This would be a ref-count cycle and a memory
+//! leak.
+
+use crate::loom::sync::{Arc, Mutex};
+use crate::runtime;
+use crate::runtime::context;
+use crate::runtime::scheduler::multi_thread::{
+ idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
+};
+use crate::runtime::scheduler::{inject, Defer, Lock};
+use crate::runtime::task::OwnedTasks;
+use crate::runtime::{
+ blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
+};
+use crate::util::atomic_cell::AtomicCell;
+use crate::util::rand::{FastRand, RngSeedGenerator};
+
+use std::cell::RefCell;
+use std::task::Waker;
+use std::time::Duration;
+
+cfg_metrics! {
+ mod metrics;
+}
+
+cfg_taskdump! {
+ mod taskdump;
+}
+
+cfg_not_taskdump! {
+ mod taskdump_mock;
+}
+
+/// A scheduler worker
+pub(super) struct Worker {
+ /// Reference to scheduler's handle
+ handle: Arc<Handle>,
+
+ /// Index holding this worker's remote state
+ index: usize,
+
+ /// Used to hand-off a worker's core to another thread.
+ core: AtomicCell<Core>,
+}
+
+/// Core data
+struct Core {
+ /// Used to schedule bookkeeping tasks every so often.
+ tick: u32,
+
+ /// When a task is scheduled from a worker, it is stored in this slot. The
+ /// worker will check this slot for a task **before** checking the run
+ /// queue. This effectively results in the **last** scheduled task to be run
+ /// next (LIFO). This is an optimization for improving locality which
+ /// benefits message passing patterns and helps to reduce latency.
+ lifo_slot: Option<Notified>,
+
+ /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
+ /// they go to the back of the `run_queue`.
+ lifo_enabled: bool,
+
+ /// The worker-local run queue.
+ run_queue: queue::Local<Arc<Handle>>,
+
+ /// True if the worker is currently searching for more work. Searching
+ /// involves attempting to steal from other workers.
+ is_searching: bool,
+
+ /// True if the scheduler is being shutdown
+ is_shutdown: bool,
+
+ /// True if the scheduler is being traced
+ is_traced: bool,
+
+ /// Parker
+ ///
+ /// Stored in an `Option` as the parker is added / removed to make the
+ /// borrow checker happy.
+ park: Option<Parker>,
+
+ /// Per-worker runtime stats
+ stats: Stats,
+
+ /// How often to check the global queue
+ global_queue_interval: u32,
+
+ /// Fast random number generator.
+ rand: FastRand,
+}
+
+/// State shared across all workers
+pub(crate) struct Shared {
+ /// Per-worker remote state. All other workers have access to this and is
+ /// how they communicate between each other.
+ remotes: Box<[Remote]>,
+
+ /// Global task queue used for:
+ /// 1. Submit work to the scheduler while **not** currently on a worker thread.
+ /// 2. Submit work to the scheduler when a worker run queue is saturated
+ pub(super) inject: inject::Shared<Arc<Handle>>,
+
+ /// Coordinates idle workers
+ idle: Idle,
+
+ /// Collection of all active tasks spawned onto this executor.
+ pub(super) owned: OwnedTasks<Arc<Handle>>,
+
+ /// Data synchronized by the scheduler mutex
+ pub(super) synced: Mutex<Synced>,
+
+ /// Cores that have observed the shutdown signal
+ ///
+ /// The core is **not** placed back in the worker to avoid it from being
+ /// stolen by a thread that was spawned as part of `block_in_place`.
+ #[allow(clippy::vec_box)] // we're moving an already-boxed value
+ shutdown_cores: Mutex<Vec<Box<Core>>>,
+
+ /// The number of cores that have observed the trace signal.
+ pub(super) trace_status: TraceStatus,
+
+ /// Scheduler configuration options
+ config: Config,
+
+ /// Collects metrics from the runtime.
+ pub(super) scheduler_metrics: SchedulerMetrics,
+
+ pub(super) worker_metrics: Box<[WorkerMetrics]>,
+
+ /// Only held to trigger some code on drop. This is used to get internal
+ /// runtime metrics that can be useful when doing performance
+ /// investigations. This does nothing (empty struct, no drop impl) unless
+ /// the `tokio_internal_mt_counters` cfg flag is set.
+ _counters: Counters,
+}
+
+/// Data synchronized by the scheduler mutex
+pub(crate) struct Synced {
+ /// Synchronized state for `Idle`.
+ pub(super) idle: idle::Synced,
+
+ /// Synchronized state for `Inject`.
+ pub(crate) inject: inject::Synced,
+}
+
+/// Used to communicate with a worker from other threads.
+struct Remote {
+ /// Steals tasks from this worker.
+ pub(super) steal: queue::Steal<Arc<Handle>>,
+
+ /// Unparks the associated worker thread
+ unpark: Unparker,
+}
+
+/// Thread-local context
+pub(crate) struct Context {
+ /// Worker
+ worker: Arc<Worker>,
+
+ /// Core data
+ core: RefCell<Option<Box<Core>>>,
+
+ /// Tasks to wake after resource drivers are polled. This is mostly to
+ /// handle yielded tasks.
+ pub(crate) defer: Defer,
+}
+
+/// Starts the workers
+pub(crate) struct Launch(Vec<Arc<Worker>>);
+
+/// Running a task may consume the core. If the core is still available when
+/// running the task completes, it is returned. Otherwise, the worker will need
+/// to stop processing.
+type RunResult = Result<Box<Core>, ()>;
+
+/// A task handle
+type Task = task::Task<Arc<Handle>>;
+
+/// A notified task handle
+type Notified = task::Notified<Arc<Handle>>;
+
+/// Value picked out of thin-air. Running the LIFO slot a handful of times
+/// seemms sufficient to benefit from locality. More than 3 times probably is
+/// overweighing. The value can be tuned in the future with data that shows
+/// improvements.
+const MAX_LIFO_POLLS_PER_TICK: usize = 3;
+
+pub(super) fn create(
+ size: usize,
+ park: Parker,
+ driver_handle: driver::Handle,
+ blocking_spawner: blocking::Spawner,
+ seed_generator: RngSeedGenerator,
+ config: Config,
+) -> (Arc<Handle>, Launch) {
+ let mut cores = Vec::with_capacity(size);
+ let mut remotes = Vec::with_capacity(size);
+ let mut worker_metrics = Vec::with_capacity(size);
+
+ // Create the local queues
+ for _ in 0..size {
+ let (steal, run_queue) = queue::local();
+
+ let park = park.clone();
+ let unpark = park.unpark();
+ let metrics = WorkerMetrics::from_config(&config);
+ let stats = Stats::new(&metrics);
+
+ cores.push(Box::new(Core {
+ tick: 0,
+ lifo_slot: None,
+ lifo_enabled: !config.disable_lifo_slot,
+ run_queue,
+ is_searching: false,
+ is_shutdown: false,
+ is_traced: false,
+ park: Some(park),
+ global_queue_interval: stats.tuned_global_queue_interval(&config),
+ stats,
+ rand: FastRand::from_seed(config.seed_generator.next_seed()),
+ }));
+
+ remotes.push(Remote { steal, unpark });
+ worker_metrics.push(metrics);
+ }
+
+ let (idle, idle_synced) = Idle::new(size);
+ let (inject, inject_synced) = inject::Shared::new();
+
+ let remotes_len = remotes.len();
+ let handle = Arc::new(Handle {
+ shared: Shared {
+ remotes: remotes.into_boxed_slice(),
+ inject,
+ idle,
+ owned: OwnedTasks::new(),
+ synced: Mutex::new(Synced {
+ idle: idle_synced,
+ inject: inject_synced,
+ }),
+ shutdown_cores: Mutex::new(vec![]),
+ trace_status: TraceStatus::new(remotes_len),
+ config,
+ scheduler_metrics: SchedulerMetrics::new(),
+ worker_metrics: worker_metrics.into_boxed_slice(),
+ _counters: Counters,
+ },
+ driver: driver_handle,
+ blocking_spawner,
+ seed_generator,
+ });
+
+ let mut launch = Launch(vec![]);
+
+ for (index, core) in cores.drain(..).enumerate() {
+ launch.0.push(Arc::new(Worker {
+ handle: handle.clone(),
+ index,
+ core: AtomicCell::new(Some(core)),
+ }));
+ }
+
+ (handle, launch)
+}
+
+#[track_caller]
+pub(crate) fn block_in_place<F, R>(f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ // Try to steal the worker core back
+ struct Reset {
+ take_core: bool,
+ budget: coop::Budget,
+ }
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ with_current(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ if self.take_core {
+ let core = cx.worker.core.take();
+ let mut cx_core = cx.core.borrow_mut();
+ assert!(cx_core.is_none());
+ *cx_core = core;
+ }
+
+ // Reset the task budget as we are re-entering the
+ // runtime.
+ coop::set(self.budget);
+ }
+ });
+ }
+ }
+
+ let mut had_entered = false;
+ let mut take_core = false;
+
+ let setup_result = with_current(|maybe_cx| {
+ match (
+ crate::runtime::context::current_enter_context(),
+ maybe_cx.is_some(),
+ ) {
+ (context::EnterRuntime::Entered { .. }, true) => {
+ // We are on a thread pool runtime thread, so we just need to
+ // set up blocking.
+ had_entered = true;
+ }
+ (
+ context::EnterRuntime::Entered {
+ allow_block_in_place,
+ },
+ false,
+ ) => {
+ // We are on an executor, but _not_ on the thread pool. That is
+ // _only_ okay if we are in a thread pool runtime's block_on
+ // method:
+ if allow_block_in_place {
+ had_entered = true;
+ return Ok(());
+ } else {
+ // This probably means we are on the current_thread runtime or in a
+ // LocalSet, where it is _not_ okay to block.
+ return Err(
+ "can call blocking only when running on the multi-threaded runtime",
+ );
+ }
+ }
+ (context::EnterRuntime::NotEntered, true) => {
+ // This is a nested call to block_in_place (we already exited).
+ // All the necessary setup has already been done.
+ return Ok(());
+ }
+ (context::EnterRuntime::NotEntered, false) => {
+ // We are outside of the tokio runtime, so blocking is fine.
+ // We can also skip all of the thread pool blocking setup steps.
+ return Ok(());
+ }
+ }
+
+ let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
+
+ // Get the worker core. If none is set, then blocking is fine!
+ let core = match cx.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return Ok(()),
+ };
+
+ // We are taking the core from the context and sending it to another
+ // thread.
+ take_core = true;
+
+ // The parker should be set here
+ assert!(core.park.is_some());
+
+ // In order to block, the core must be sent to another thread for
+ // execution.
+ //
+ // First, move the core back into the worker's shared core slot.
+ cx.worker.core.set(core);
+
+ // Next, clone the worker handle and send it to a new thread for
+ // processing.
+ //
+ // Once the blocking task is done executing, we will attempt to
+ // steal the core back.
+ let worker = cx.worker.clone();
+ runtime::spawn_blocking(move || run(worker));
+ Ok(())
+ });
+
+ if let Err(panic_message) = setup_result {
+ panic!("{}", panic_message);
+ }
+
+ if had_entered {
+ // Unset the current task's budget. Blocking sections are not
+ // constrained by task budgets.
+ let _reset = Reset {
+ take_core,
+ budget: coop::stop(),
+ };
+
+ crate::runtime::context::exit_runtime(f)
+ } else {
+ f()
+ }
+}
+
+impl Launch {
+ pub(crate) fn launch(mut self) {
+ for worker in self.0.drain(..) {
+ runtime::spawn_blocking(move || run(worker));
+ }
+ }
+}
+
+fn run(worker: Arc<Worker>) {
+ struct AbortOnPanic;
+
+ impl Drop for AbortOnPanic {
+ fn drop(&mut self) {
+ if std::thread::panicking() {
+ eprintln!("worker thread panicking; aborting process");
+ std::process::abort();
+ }
+ }
+ }
+
+ // Catching panics on worker threads in tests is quite tricky. Instead, when
+ // debug assertions are enabled, we just abort the process.
+ #[cfg(debug_assertions)]
+ let _abort_on_panic = AbortOnPanic;
+
+ // Acquire a core. If this fails, then another thread is running this
+ // worker and there is nothing further to do.
+ let core = match worker.core.take() {
+ Some(core) => core,
+ None => return,
+ };
+
+ let handle = scheduler::Handle::MultiThread(worker.handle.clone());
+
+ crate::runtime::context::enter_runtime(&handle, true, |_| {
+ // Set the worker context.
+ let cx = scheduler::Context::MultiThread(Context {
+ worker,
+ core: RefCell::new(None),
+ defer: Defer::new(),
+ });
+
+ context::set_scheduler(&cx, || {
+ let cx = cx.expect_multi_thread();
+
+ // This should always be an error. It only returns a `Result` to support
+ // using `?` to short circuit.
+ assert!(cx.run(core).is_err());
+
+ // Check if there are any deferred tasks to notify. This can happen when
+ // the worker core is lost due to `block_in_place()` being called from
+ // within the task.
+ cx.defer.wake();
+ });
+ });
+}
+
+impl Context {
+ fn run(&self, mut core: Box<Core>) -> RunResult {
+ // Reset `lifo_enabled` here in case the core was previously stolen from
+ // a task that had the LIFO slot disabled.
+ self.reset_lifo_enabled(&mut core);
+
+ // Start as "processing" tasks as polling tasks from the local queue
+ // will be one of the first things we do.
+ core.stats.start_processing_scheduled_tasks();
+
+ while !core.is_shutdown {
+ self.assert_lifo_enabled_is_correct(&core);
+
+ if core.is_traced {
+ core = self.worker.handle.trace_core(core);
+ }
+
+ // Increment the tick
+ core.tick();
+
+ // Run maintenance, if needed
+ core = self.maintenance(core);
+
+ // First, check work available to the current worker.
+ if let Some(task) = core.next_task(&self.worker) {
+ core = self.run_task(task, core)?;
+ continue;
+ }
+
+ // We consumed all work in the queues and will start searching for work.
+ core.stats.end_processing_scheduled_tasks();
+
+ // There is no more **local** work to process, try to steal work
+ // from other workers.
+ if let Some(task) = core.steal_work(&self.worker) {
+ // Found work, switch back to processing
+ core.stats.start_processing_scheduled_tasks();
+ core = self.run_task(task, core)?;
+ } else {
+ // Wait for work
+ core = if !self.defer.is_empty() {
+ self.park_timeout(core, Some(Duration::from_millis(0)))
+ } else {
+ self.park(core)
+ };
+ }
+ }
+
+ core.pre_shutdown(&self.worker);
+
+ // Signal shutdown
+ self.worker.handle.shutdown_core(core);
+ Err(())
+ }
+
+ fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
+ let task = self.worker.handle.shared.owned.assert_owner(task);
+
+ // Make sure the worker is not in the **searching** state. This enables
+ // another idle worker to try to steal work.
+ core.transition_from_searching(&self.worker);
+
+ self.assert_lifo_enabled_is_correct(&core);
+
+ // Measure the poll start time. Note that we may end up polling other
+ // tasks under this measurement. In this case, the tasks came from the
+ // LIFO slot and are considered part of the current task for scheduling
+ // purposes. These tasks inherent the "parent"'s limits.
+ core.stats.start_poll();
+
+ // Make the core available to the runtime context
+ *self.core.borrow_mut() = Some(core);
+
+ // Run the task
+ coop::budget(|| {
+ task.run();
+ let mut lifo_polls = 0;
+
+ // As long as there is budget remaining and a task exists in the
+ // `lifo_slot`, then keep running.
+ loop {
+ // Check if we still have the core. If not, the core was stolen
+ // by another worker.
+ let mut core = match self.core.borrow_mut().take() {
+ Some(core) => core,
+ None => {
+ // In this case, we cannot call `reset_lifo_enabled()`
+ // because the core was stolen. The stealer will handle
+ // that at the top of `Context::run`
+ return Err(());
+ }
+ };
+
+ // Check for a task in the LIFO slot
+ let task = match core.lifo_slot.take() {
+ Some(task) => task,
+ None => {
+ self.reset_lifo_enabled(&mut core);
+ core.stats.end_poll();
+ return Ok(core);
+ }
+ };
+
+ if !coop::has_budget_remaining() {
+ core.stats.end_poll();
+
+ // Not enough budget left to run the LIFO task, push it to
+ // the back of the queue and return.
+ core.run_queue.push_back_or_overflow(
+ task,
+ &*self.worker.handle,
+ &mut core.stats,
+ );
+ // If we hit this point, the LIFO slot should be enabled.
+ // There is no need to reset it.
+ debug_assert!(core.lifo_enabled);
+ return Ok(core);
+ }
+
+ // Track that we are about to run a task from the LIFO slot.
+ lifo_polls += 1;
+ super::counters::inc_lifo_schedules();
+
+ // Disable the LIFO slot if we reach our limit
+ //
+ // In ping-ping style workloads where task A notifies task B,
+ // which notifies task A again, continuously prioritizing the
+ // LIFO slot can cause starvation as these two tasks will
+ // repeatedly schedule the other. To mitigate this, we limit the
+ // number of times the LIFO slot is prioritized.
+ if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
+ core.lifo_enabled = false;
+ super::counters::inc_lifo_capped();
+ }
+
+ // Run the LIFO task, then loop
+ *self.core.borrow_mut() = Some(core);
+ let task = self.worker.handle.shared.owned.assert_owner(task);
+ task.run();
+ }
+ })
+ }
+
+ fn reset_lifo_enabled(&self, core: &mut Core) {
+ core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
+ }
+
+ fn assert_lifo_enabled_is_correct(&self, core: &Core) {
+ debug_assert_eq!(
+ core.lifo_enabled,
+ !self.worker.handle.shared.config.disable_lifo_slot
+ );
+ }
+
+ fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
+ if core.tick % self.worker.handle.shared.config.event_interval == 0 {
+ super::counters::inc_num_maintenance();
+
+ core.stats.end_processing_scheduled_tasks();
+
+ // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
+ // to run without actually putting the thread to sleep.
+ core = self.park_timeout(core, Some(Duration::from_millis(0)));
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+
+ core.stats.start_processing_scheduled_tasks();
+ }
+
+ core
+ }
+
+ /// Parks the worker thread while waiting for tasks to execute.
+ ///
+ /// This function checks if indeed there's no more work left to be done before parking.
+ /// Also important to notice that, before parking, the worker thread will try to take
+ /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
+ /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
+ /// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
+ /// When the local queue is saturated, the overflow tasks are added to the injection queue
+ /// from where other workers can pick them up.
+ /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
+ /// after all the IOs get dispatched
+ fn park(&self, mut core: Box<Core>) -> Box<Core> {
+ if let Some(f) = &self.worker.handle.shared.config.before_park {
+ f();
+ }
+
+ if core.transition_to_parked(&self.worker) {
+ while !core.is_shutdown && !core.is_traced {
+ core.stats.about_to_park();
+ core = self.park_timeout(core, None);
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+
+ if core.transition_from_parked(&self.worker) {
+ break;
+ }
+ }
+ }
+
+ if let Some(f) = &self.worker.handle.shared.config.after_unpark {
+ f();
+ }
+ core
+ }
+
+ fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
+ self.assert_lifo_enabled_is_correct(&core);
+
+ // Take the parker out of core
+ let mut park = core.park.take().expect("park missing");
+
+ // Store `core` in context
+ *self.core.borrow_mut() = Some(core);
+
+ // Park thread
+ if let Some(timeout) = duration {
+ park.park_timeout(&self.worker.handle.driver, timeout);
+ } else {
+ park.park(&self.worker.handle.driver);
+ }
+
+ self.defer.wake();
+
+ // Remove `core` from context
+ core = self.core.borrow_mut().take().expect("core missing");
+
+ // Place `park` back in `core`
+ core.park = Some(park);
+
+ if core.should_notify_others() {
+ self.worker.handle.notify_parked_local();
+ }
+
+ core
+ }
+
+ pub(crate) fn defer(&self, waker: &Waker) {
+ self.defer.defer(waker);
+ }
+}
+
+impl Core {
+ /// Increment the tick
+ fn tick(&mut self) {
+ self.tick = self.tick.wrapping_add(1);
+ }
+
+ /// Return the next notified task available to this worker.
+ fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
+ if self.tick % self.global_queue_interval == 0 {
+ // Update the global queue interval, if needed
+ self.tune_global_queue_interval(worker);
+
+ worker
+ .handle
+ .next_remote_task()
+ .or_else(|| self.next_local_task())
+ } else {
+ let maybe_task = self.next_local_task();
+
+ if maybe_task.is_some() {
+ return maybe_task;
+ }
+
+ if worker.inject().is_empty() {
+ return None;
+ }
+
+ // Other threads can only **remove** tasks from the current worker's
+ // `run_queue`. So, we can be confident that by the time we call
+ // `run_queue.push_back` below, there will be *at least* `cap`
+ // available slots in the queue.
+ let cap = usize::min(
+ self.run_queue.remaining_slots(),
+ self.run_queue.max_capacity() / 2,
+ );
+
+ // The worker is currently idle, pull a batch of work from the
+ // injection queue. We don't want to pull *all* the work so other
+ // workers can also get some.
+ let n = usize::min(
+ worker.inject().len() / worker.handle.shared.remotes.len() + 1,
+ cap,
+ );
+
+ let mut synced = worker.handle.shared.synced.lock();
+ // safety: passing in the correct `inject::Synced`.
+ let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
+
+ // Pop the first task to return immedietly
+ let ret = tasks.next();
+
+ // Push the rest of the on the run queue
+ self.run_queue.push_back(tasks);
+
+ ret
+ }
+ }
+
+ fn next_local_task(&mut self) -> Option<Notified> {
+ self.lifo_slot.take().or_else(|| self.run_queue.pop())
+ }
+
+ /// Function responsible for stealing tasks from another worker
+ ///
+ /// Note: Only if less than half the workers are searching for tasks to steal
+ /// a new worker will actually try to steal. The idea is to make sure not all
+ /// workers will be trying to steal at the same time.
+ fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
+ if !self.transition_to_searching(worker) {
+ return None;
+ }
+
+ let num = worker.handle.shared.remotes.len();
+ // Start from a random worker
+ let start = self.rand.fastrand_n(num as u32) as usize;
+
+ for i in 0..num {
+ let i = (start + i) % num;
+
+ // Don't steal from ourself! We know we don't have work.
+ if i == worker.index {
+ continue;
+ }
+
+ let target = &worker.handle.shared.remotes[i];
+ if let Some(task) = target
+ .steal
+ .steal_into(&mut self.run_queue, &mut self.stats)
+ {
+ return Some(task);
+ }
+ }
+
+ // Fallback on checking the global queue
+ worker.handle.next_remote_task()
+ }
+
+ fn transition_to_searching(&mut self, worker: &Worker) -> bool {
+ if !self.is_searching {
+ self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
+ }
+
+ self.is_searching
+ }
+
+ fn transition_from_searching(&mut self, worker: &Worker) {
+ if !self.is_searching {
+ return;
+ }
+
+ self.is_searching = false;
+ worker.handle.transition_worker_from_searching();
+ }
+
+ fn has_tasks(&self) -> bool {
+ self.lifo_slot.is_some() || self.run_queue.has_tasks()
+ }
+
+ fn should_notify_others(&self) -> bool {
+ // If there are tasks available to steal, but this worker is not
+ // looking for tasks to steal, notify another worker.
+ if self.is_searching {
+ return false;
+ }
+ self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
+ }
+
+ /// Prepares the worker state for parking.
+ ///
+ /// Returns true if the transition happened, false if there is work to do first.
+ fn transition_to_parked(&mut self, worker: &Worker) -> bool {
+ // Workers should not park if they have work to do
+ if self.has_tasks() || self.is_traced {
+ return false;
+ }
+
+ // When the final worker transitions **out** of searching to parked, it
+ // must check all the queues one last time in case work materialized
+ // between the last work scan and transitioning out of searching.
+ let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
+ &worker.handle.shared,
+ worker.index,
+ self.is_searching,
+ );
+
+ // The worker is no longer searching. Setting this is the local cache
+ // only.
+ self.is_searching = false;
+
+ if is_last_searcher {
+ worker.handle.notify_if_work_pending();
+ }
+
+ true
+ }
+
+ /// Returns `true` if the transition happened.
+ fn transition_from_parked(&mut self, worker: &Worker) -> bool {
+ // If a task is in the lifo slot/run queue, then we must unpark regardless of
+ // being notified
+ if self.has_tasks() {
+ // When a worker wakes, it should only transition to the "searching"
+ // state when the wake originates from another worker *or* a new task
+ // is pushed. We do *not* want the worker to transition to "searching"
+ // when it wakes when the I/O driver receives new events.
+ self.is_searching = !worker
+ .handle
+ .shared
+ .idle
+ .unpark_worker_by_id(&worker.handle.shared, worker.index);
+ return true;
+ }
+
+ if worker
+ .handle
+ .shared
+ .idle
+ .is_parked(&worker.handle.shared, worker.index)
+ {
+ return false;
+ }
+
+ // When unparked, the worker is in the searching state.
+ self.is_searching = true;
+ true
+ }
+
+ /// Runs maintenance work such as checking the pool's state.
+ fn maintenance(&mut self, worker: &Worker) {
+ self.stats
+ .submit(&worker.handle.shared.worker_metrics[worker.index]);
+
+ if !self.is_shutdown {
+ // Check if the scheduler has been shutdown
+ let synced = worker.handle.shared.synced.lock();
+ self.is_shutdown = worker.inject().is_closed(&synced.inject);
+ }
+
+ if !self.is_traced {
+ // Check if the worker should be tracing.
+ self.is_traced = worker.handle.shared.trace_status.trace_requested();
+ }
+ }
+
+ /// Signals all tasks to shut down, and waits for them to complete. Must run
+ /// before we enter the single-threaded phase of shutdown processing.
+ fn pre_shutdown(&mut self, worker: &Worker) {
+ // Signal to all tasks to shut down.
+ worker.handle.shared.owned.close_and_shutdown_all();
+
+ self.stats
+ .submit(&worker.handle.shared.worker_metrics[worker.index]);
+ }
+
+ /// Shuts down the core.
+ fn shutdown(&mut self, handle: &Handle) {
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
+
+ // Drain the queue
+ while self.next_local_task().is_some() {}
+
+ park.shutdown(&handle.driver);
+ }
+
+ fn tune_global_queue_interval(&mut self, worker: &Worker) {
+ let next = self
+ .stats
+ .tuned_global_queue_interval(&worker.handle.shared.config);
+
+ debug_assert!(next > 1);
+
+ // Smooth out jitter
+ if abs_diff(self.global_queue_interval, next) > 2 {
+ self.global_queue_interval = next;
+ }
+ }
+}
+
+impl Worker {
+ /// Returns a reference to the scheduler's injection queue.
+ fn inject(&self) -> &inject::Shared<Arc<Handle>> {
+ &self.handle.shared.inject
+ }
+}
+
+// TODO: Move `Handle` impls into handle.rs
+impl task::Schedule for Arc<Handle> {
+ fn release(&self, task: &Task) -> Option<Task> {
+ self.shared.owned.remove(task)
+ }
+
+ fn schedule(&self, task: Notified) {
+ self.schedule_task(task, false);
+ }
+
+ fn yield_now(&self, task: Notified) {
+ self.schedule_task(task, true);
+ }
+}
+
+impl Handle {
+ pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
+ with_current(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ // Make sure the task is part of the **current** scheduler.
+ if self.ptr_eq(&cx.worker.handle) {
+ // And the current thread still holds a core
+ if let Some(core) = cx.core.borrow_mut().as_mut() {
+ self.schedule_local(core, task, is_yield);
+ return;
+ }
+ }
+ }
+
+ // Otherwise, use the inject queue.
+ self.push_remote_task(task);
+ self.notify_parked_remote();
+ })
+ }
+
+ fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
+ core.stats.inc_local_schedule_count();
+
+ // Spawning from the worker thread. If scheduling a "yield" then the
+ // task must always be pushed to the back of the queue, enabling other
+ // tasks to be executed. If **not** a yield, then there is more
+ // flexibility and the task may go to the front of the queue.
+ let should_notify = if is_yield || !core.lifo_enabled {
+ core.run_queue
+ .push_back_or_overflow(task, self, &mut core.stats);
+ true
+ } else {
+ // Push to the LIFO slot
+ let prev = core.lifo_slot.take();
+ let ret = prev.is_some();
+
+ if let Some(prev) = prev {
+ core.run_queue
+ .push_back_or_overflow(prev, self, &mut core.stats);
+ }
+
+ core.lifo_slot = Some(task);
+
+ ret
+ };
+
+ // Only notify if not currently parked. If `park` is `None`, then the
+ // scheduling is from a resource driver. As notifications often come in
+ // batches, the notification is delayed until the park is complete.
+ if should_notify && core.park.is_some() {
+ self.notify_parked_local();
+ }
+ }
+
+ fn next_remote_task(&self) -> Option<Notified> {
+ if self.shared.inject.is_empty() {
+ return None;
+ }
+
+ let mut synced = self.shared.synced.lock();
+ // safety: passing in correct `idle::Synced`
+ unsafe { self.shared.inject.pop(&mut synced.inject) }
+ }
+
+ fn push_remote_task(&self, task: Notified) {
+ self.shared.scheduler_metrics.inc_remote_schedule_count();
+
+ let mut synced = self.shared.synced.lock();
+ // safety: passing in correct `idle::Synced`
+ unsafe {
+ self.shared.inject.push(&mut synced.inject, task);
+ }
+ }
+
+ pub(super) fn close(&self) {
+ if self
+ .shared
+ .inject
+ .close(&mut self.shared.synced.lock().inject)
+ {
+ self.notify_all();
+ }
+ }
+
+ fn notify_parked_local(&self) {
+ super::counters::inc_num_inc_notify_local();
+
+ if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
+ super::counters::inc_num_unparks_local();
+ self.shared.remotes[index].unpark.unpark(&self.driver);
+ }
+ }
+
+ fn notify_parked_remote(&self) {
+ if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
+ self.shared.remotes[index].unpark.unpark(&self.driver);
+ }
+ }
+
+ pub(super) fn notify_all(&self) {
+ for remote in &self.shared.remotes[..] {
+ remote.unpark.unpark(&self.driver);
+ }
+ }
+
+ fn notify_if_work_pending(&self) {
+ for remote in &self.shared.remotes[..] {
+ if !remote.steal.is_empty() {
+ self.notify_parked_local();
+ return;
+ }
+ }
+
+ if !self.shared.inject.is_empty() {
+ self.notify_parked_local();
+ }
+ }
+
+ fn transition_worker_from_searching(&self) {
+ if self.shared.idle.transition_worker_from_searching() {
+ // We are the final searching worker. Because work was found, we
+ // need to notify another worker.
+ self.notify_parked_local();
+ }
+ }
+
+ /// Signals that a worker has observed the shutdown signal and has replaced
+ /// its core back into its handle.
+ ///
+ /// If all workers have reached this point, the final cleanup is performed.
+ fn shutdown_core(&self, core: Box<Core>) {
+ let mut cores = self.shared.shutdown_cores.lock();
+ cores.push(core);
+
+ if cores.len() != self.shared.remotes.len() {
+ return;
+ }
+
+ debug_assert!(self.shared.owned.is_empty());
+
+ for mut core in cores.drain(..) {
+ core.shutdown(self);
+ }
+
+ // Drain the injection queue
+ //
+ // We already shut down every task, so we can simply drop the tasks.
+ while let Some(task) = self.next_remote_task() {
+ drop(task);
+ }
+ }
+
+ fn ptr_eq(&self, other: &Handle) -> bool {
+ std::ptr::eq(self, other)
+ }
+}
+
+impl Overflow<Arc<Handle>> for Handle {
+ fn push(&self, task: task::Notified<Arc<Handle>>) {
+ self.push_remote_task(task);
+ }
+
+ fn push_batch<I>(&self, iter: I)
+ where
+ I: Iterator<Item = task::Notified<Arc<Handle>>>,
+ {
+ unsafe {
+ self.shared.inject.push_batch(self, iter);
+ }
+ }
+}
+
+pub(crate) struct InjectGuard<'a> {
+ lock: crate::loom::sync::MutexGuard<'a, Synced>,
+}
+
+impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
+ fn as_mut(&mut self) -> &mut inject::Synced {
+ &mut self.lock.inject
+ }
+}
+
+impl<'a> Lock<inject::Synced> for &'a Handle {
+ type Handle = InjectGuard<'a>;
+
+ fn lock(self) -> Self::Handle {
+ InjectGuard {
+ lock: self.shared.synced.lock(),
+ }
+ }
+}
+
+#[track_caller]
+fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
+ use scheduler::Context::MultiThread;
+
+ context::with_scheduler(|ctx| match ctx {
+ Some(MultiThread(ctx)) => f(Some(ctx)),
+ _ => f(None),
+ })
+}
+
+// `u32::abs_diff` is not available on Tokio's MSRV.
+fn abs_diff(a: u32, b: u32) -> u32 {
+ if a > b {
+ a - b
+ } else {
+ b - a
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/metrics.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/metrics.rs
new file mode 100644
index 0000000000..a9a5ab3ed6
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/metrics.rs
@@ -0,0 +1,11 @@
+use super::Shared;
+
+impl Shared {
+ pub(crate) fn injection_queue_depth(&self) -> usize {
+ self.inject.len()
+ }
+
+ pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
+ self.remotes[worker].steal.len()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs
new file mode 100644
index 0000000000..d310d9f6d3
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump.rs
@@ -0,0 +1,79 @@
+use super::{Core, Handle, Shared};
+
+use crate::loom::sync::Arc;
+use crate::runtime::scheduler::multi_thread::Stats;
+use crate::runtime::task::trace::trace_multi_thread;
+use crate::runtime::{dump, WorkerMetrics};
+
+use std::time::Duration;
+
+impl Handle {
+ pub(super) fn trace_core(&self, mut core: Box<Core>) -> Box<Core> {
+ core.is_traced = false;
+
+ if core.is_shutdown {
+ return core;
+ }
+
+ // wait for other workers, or timeout without tracing
+ let timeout = Duration::from_millis(250); // a _very_ generous timeout
+ let barrier =
+ if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) {
+ barrier
+ } else {
+ // don't attempt to trace
+ return core;
+ };
+
+ if !barrier.is_leader() {
+ // wait for leader to finish tracing
+ self.shared.trace_status.trace_end.wait();
+ return core;
+ }
+
+ // trace
+
+ let owned = &self.shared.owned;
+ let mut local = self.shared.steal_all();
+ let synced = &self.shared.synced;
+ let injection = &self.shared.inject;
+
+ // safety: `trace_multi_thread` is invoked with the same `synced` that `injection`
+ // was created with.
+ let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) }
+ .into_iter()
+ .map(dump::Task::new)
+ .collect();
+
+ let result = dump::Dump::new(traces);
+
+ // stash the result
+ self.shared.trace_status.stash_result(result);
+
+ // allow other workers to proceed
+ self.shared.trace_status.trace_end.wait();
+
+ core
+ }
+}
+
+impl Shared {
+ /// Steal all tasks from remotes into a single local queue.
+ pub(super) fn steal_all(&self) -> super::queue::Local<Arc<Handle>> {
+ let (_steal, mut local) = super::queue::local();
+
+ let worker_metrics = WorkerMetrics::new();
+ let mut stats = Stats::new(&worker_metrics);
+
+ for remote in self.remotes.iter() {
+ let steal = &remote.steal;
+ while !steal.is_empty() {
+ if let Some(task) = steal.steal_into(&mut local, &mut stats) {
+ local.push_back([task].into_iter());
+ }
+ }
+ }
+
+ local
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump_mock.rs b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump_mock.rs
new file mode 100644
index 0000000000..24c5600ce2
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/scheduler/multi_thread/worker/taskdump_mock.rs
@@ -0,0 +1,7 @@
+use super::{Core, Handle};
+
+impl Handle {
+ pub(super) fn trace_core(&self, core: Box<Core>) -> Box<Core> {
+ core
+ }
+}