From dc0db358abe19481e475e10c32149b53370f1a1c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 30 May 2024 05:57:31 +0200 Subject: Merging upstream version 1.72.1+dfsg1. Signed-off-by: Daniel Baumann --- vendor/tokio/src/runtime/context.rs | 218 +++++++++++++++++++++++++++--------- 1 file changed, 168 insertions(+), 50 deletions(-) (limited to 'vendor/tokio/src/runtime/context.rs') diff --git a/vendor/tokio/src/runtime/context.rs b/vendor/tokio/src/runtime/context.rs index a727ed497..5943e9aa9 100644 --- a/vendor/tokio/src/runtime/context.rs +++ b/vendor/tokio/src/runtime/context.rs @@ -1,73 +1,191 @@ -//! Thread local runtime context -use crate::runtime::Handle; +use crate::loom::thread::AccessError; +use crate::runtime::coop; -use std::cell::RefCell; +use std::cell::Cell; -thread_local! { - static CONTEXT: RefCell> = RefCell::new(None) -} +#[cfg(any(feature = "rt", feature = "macros"))] +use crate::util::rand::FastRand; -pub(crate) fn current() -> Option { - CONTEXT.with(|ctx| ctx.borrow().clone()) -} +cfg_rt! { + mod blocking; + pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard}; -cfg_io_driver! { - pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { - CONTEXT.with(|ctx| { - let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone() - }) + mod current; + pub(crate) use current::{with_current, try_set_current, SetCurrentGuard}; + + mod runtime; + pub(crate) use runtime::{EnterRuntime, enter_runtime}; + + mod scoped; + use scoped::Scoped; + + use crate::runtime::{scheduler, task::Id}; + + use std::task::Waker; + + cfg_taskdump! { + use crate::runtime::task::trace; } } -cfg_signal_internal! { - #[cfg(unix)] - pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { - CONTEXT.with(|ctx| { - let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone() - }) - } +cfg_rt_multi_thread! { + mod runtime_mt; + pub(crate) use runtime_mt::{current_enter_context, exit_runtime}; } -cfg_time! { - pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { - CONTEXT.with(|ctx| { - let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone() - }) - } +struct Context { + /// Uniquely identifies the current thread + #[cfg(feature = "rt")] + thread_id: Cell>, - cfg_test_util! { - pub(crate) fn clock() -> Option { - CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) - } - } + /// Handle to the runtime scheduler running on the current thread. + #[cfg(feature = "rt")] + current: current::HandleCell, + + /// Handle to the scheduler's internal "context" + #[cfg(feature = "rt")] + scheduler: Scoped, + + #[cfg(feature = "rt")] + current_task_id: Cell>, + + /// Tracks if the current thread is currently driving a runtime. + /// Note, that if this is set to "entered", the current scheduler + /// handle may not reference the runtime currently executing. This + /// is because other runtime handles may be set to current from + /// within a runtime. + #[cfg(feature = "rt")] + runtime: Cell, + + #[cfg(any(feature = "rt", feature = "macros"))] + rng: Cell>, + + /// Tracks the amount of "work" a task may still do before yielding back to + /// the sheduler + budget: Cell, + + #[cfg(all( + tokio_unstable, + tokio_taskdump, + feature = "rt", + target_os = "linux", + any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") + ))] + trace: trace::Context, } -cfg_rt! { - pub(crate) fn spawn_handle() -> Option { - CONTEXT.with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) +tokio_thread_local! { + static CONTEXT: Context = const { + Context { + #[cfg(feature = "rt")] + thread_id: Cell::new(None), + + /// Tracks the current runtime handle to use when spawning, + /// accessing drivers, etc... + #[cfg(feature = "rt")] + current: current::HandleCell::new(), + + /// Tracks the current scheduler internal context + #[cfg(feature = "rt")] + scheduler: Scoped::new(), + + #[cfg(feature = "rt")] + current_task_id: Cell::new(None), + + /// Tracks if the current thread is currently driving a runtime. + /// Note, that if this is set to "entered", the current scheduler + /// handle may not reference the runtime currently executing. This + /// is because other runtime handles may be set to current from + /// within a runtime. + #[cfg(feature = "rt")] + runtime: Cell::new(EnterRuntime::NotEntered), + + #[cfg(any(feature = "rt", feature = "macros"))] + rng: Cell::new(None), + + budget: Cell::new(coop::Budget::unconstrained()), + + #[cfg(all( + tokio_unstable, + tokio_taskdump, + feature = "rt", + target_os = "linux", + any( + target_arch = "aarch64", + target_arch = "x86", + target_arch = "x86_64" + ) + ))] + trace: trace::Context::new(), + } } } -/// Set this [`Handle`] as the current active [`Handle`]. -/// -/// [`Handle`]: Handle -pub(crate) fn enter(new: Handle) -> EnterGuard { +#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))] +pub(crate) fn thread_rng_n(n: u32) -> u32 { CONTEXT.with(|ctx| { - let old = ctx.borrow_mut().replace(new); - EnterGuard(old) + let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new); + let ret = rng.fastrand_n(n); + ctx.rng.set(Some(rng)); + ret }) } -#[derive(Debug)] -pub(crate) struct EnterGuard(Option); +pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result { + CONTEXT.try_with(|ctx| f(&ctx.budget)) +} + +cfg_rt! { + use crate::runtime::ThreadId; + + pub(crate) fn thread_id() -> Result { + CONTEXT.try_with(|ctx| { + match ctx.thread_id.get() { + Some(id) => id, + None => { + let id = ThreadId::next(); + ctx.thread_id.set(Some(id)); + id + } + } + }) + } -impl Drop for EnterGuard { - fn drop(&mut self) { - CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.0.take(); + pub(crate) fn set_current_task_id(id: Option) -> Option { + CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None) + } + + pub(crate) fn current_task_id() -> Option { + CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None) + } + + #[track_caller] + pub(crate) fn defer(waker: &Waker) { + with_scheduler(|maybe_scheduler| { + if let Some(scheduler) = maybe_scheduler { + scheduler.defer(waker); + } else { + // Called from outside of the runtime, immediately wake the + // task. + waker.wake_by_ref(); + } }); } + + pub(super) fn set_scheduler(v: &scheduler::Context, f: impl FnOnce() -> R) -> R { + CONTEXT.with(|c| c.scheduler.set(v, f)) + } + + #[track_caller] + pub(super) fn with_scheduler(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R { + CONTEXT.with(|c| c.scheduler.with(f)) + } + + cfg_taskdump! { + /// SAFETY: Callers of this function must ensure that trace frames always + /// form a valid linked list. + pub(crate) unsafe fn with_trace(f: impl FnOnce(&trace::Context) -> R) -> Option { + CONTEXT.try_with(|c| f(&c.trace)).ok() + } + } } -- cgit v1.2.3