diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/basic_scheduler.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/basic_scheduler.rs | 574 |
1 files changed, 574 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/basic_scheduler.rs b/third_party/rust/tokio/src/runtime/basic_scheduler.rs new file mode 100644 index 0000000000..401f55b3f2 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/basic_scheduler.rs @@ -0,0 +1,574 @@ +use crate::future::poll_fn; +use crate::loom::sync::atomic::AtomicBool; +use crate::loom::sync::{Arc, Mutex}; +use crate::park::{Park, Unpark}; +use crate::runtime::context::EnterGuard; +use crate::runtime::driver::Driver; +use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::Callback; +use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::sync::notify::Notify; +use crate::util::atomic_cell::AtomicCell; +use crate::util::{waker_ref, Wake, WakerRef}; + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::sync::atomic::Ordering::{AcqRel, Release}; +use std::task::Poll::{Pending, Ready}; +use std::time::Duration; + +/// Executes tasks on the current thread +pub(crate) struct BasicScheduler { + /// Core scheduler data is acquired by a thread entering `block_on`. + core: AtomicCell<Core>, + + /// Notifier for waking up other threads to steal the + /// driver. + notify: Notify, + + /// Sendable task spawner + spawner: Spawner, + + /// This is usually None, but right before dropping the BasicScheduler, it + /// is changed to `Some` with the context being the runtime's own context. + /// This ensures that any tasks dropped in the `BasicScheduler`s destructor + /// run in that runtime's context. + context_guard: Option<EnterGuard>, +} + +/// Data required for executing the scheduler. The struct is passed around to +/// a function that will perform the scheduling work and acts as a capability token. +struct Core { + /// Scheduler run queue + tasks: VecDeque<task::Notified<Arc<Shared>>>, + + /// Sendable task spawner + spawner: Spawner, + + /// Current tick + tick: u8, + + /// Runtime driver + /// + /// The driver is removed before starting to park the thread + driver: Option<Driver>, + + /// Metrics batch + metrics: MetricsBatch, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + shared: Arc<Shared>, +} + +/// Scheduler state shared between threads. +struct Shared { + /// Remote run queue. None if the `Runtime` has been dropped. + queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>, + + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks<Arc<Shared>>, + + /// Unpark the blocked thread. + unpark: <Driver as Park>::Unpark, + + /// Indicates whether the blocked on thread was woken. + woken: AtomicBool, + + /// Callback for a worker parking itself + before_park: Option<Callback>, + + /// Callback for a worker unparking itself + after_unpark: Option<Callback>, + + /// Keeps track of various runtime metrics. + scheduler_metrics: SchedulerMetrics, + + /// This scheduler only has one worker. + worker_metrics: WorkerMetrics, +} + +/// Thread-local context. +struct Context { + /// Handle to the spawner + spawner: Spawner, + + /// Scheduler core, enabling the holder of `Context` to execute the + /// scheduler. + core: RefCell<Option<Box<Core>>>, +} + +/// Initial queue capacity. +const INITIAL_CAPACITY: usize = 64; + +/// Max number of tasks to poll per tick. +#[cfg(loom)] +const MAX_TASKS_PER_TICK: usize = 4; +#[cfg(not(loom))] +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often to check the remote queue first. +const REMOTE_FIRST_INTERVAL: u8 = 31; + +// Tracks the current BasicScheduler. +scoped_thread_local!(static CURRENT: Context); + +impl BasicScheduler { + pub(crate) fn new( + driver: Driver, + before_park: Option<Callback>, + after_unpark: Option<Callback>, + ) -> BasicScheduler { + let unpark = driver.unpark(); + + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), + owned: OwnedTasks::new(), + unpark, + woken: AtomicBool::new(false), + before_park, + after_unpark, + scheduler_metrics: SchedulerMetrics::new(), + worker_metrics: WorkerMetrics::new(), + }), + }; + + let core = AtomicCell::new(Some(Box::new(Core { + tasks: VecDeque::with_capacity(INITIAL_CAPACITY), + spawner: spawner.clone(), + tick: 0, + driver: Some(driver), + metrics: MetricsBatch::new(), + }))); + + BasicScheduler { + core, + notify: Notify::new(), + spawner, + context_guard: None, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { + pin!(future); + + // Attempt to steal the scheduler core and block_on the future if we can + // there, otherwise, lets select on a notification that the core is + // available or the future is complete. + loop { + if let Some(core) = self.take_core() { + return core.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } + } + + fn take_core(&self) -> Option<CoreGuard<'_>> { + let core = self.core.take()?; + + Some(CoreGuard { + context: Context { + spawner: self.spawner.clone(), + core: RefCell::new(Some(core)), + }, + basic_scheduler: self, + }) + } + + pub(super) fn set_context_guard(&mut self, guard: EnterGuard) { + self.context_guard = Some(guard); + } +} + +impl Drop for BasicScheduler { + fn drop(&mut self) { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let core = match self.take_core() { + Some(core) => core, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Core back, this is a bug!"), + }; + + core.enter(|mut core, context| { + // Drain the OwnedTasks collection. This call also closes the + // collection, ensuring that no tasks are ever pushed after this + // call returns. + context.spawner.shared.owned.close_and_shutdown_all(); + + // Drain local queue + // We already shut down every task, so we just need to drop the task. + while let Some(task) = core.pop_task() { + drop(task); + } + + // Drain remote queue and set it to None + let remote_queue = core.spawner.shared.queue.lock().take(); + + // Using `Option::take` to replace the shared queue with `None`. + // We already shut down every task, so we just need to drop the task. + if let Some(remote_queue) = remote_queue { + for task in remote_queue { + drop(task); + } + } + + assert!(context.spawner.shared.owned.is_empty()); + + // Submit metrics + core.metrics.submit(&core.spawner.shared.worker_metrics); + + (core, ()) + }); + } +} + +impl fmt::Debug for BasicScheduler { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() + } +} + +// ===== impl Core ===== + +impl Core { + fn pop_task(&mut self) -> Option<task::Notified<Arc<Shared>>> { + let ret = self.tasks.pop_front(); + self.spawner + .shared + .worker_metrics + .set_queue_depth(self.tasks.len()); + ret + } + + fn push_task(&mut self, task: task::Notified<Arc<Shared>>) { + self.tasks.push_back(task); + self.metrics.inc_local_schedule_count(); + self.spawner + .shared + .worker_metrics + .set_queue_depth(self.tasks.len()); + } +} + +// ===== impl Context ===== + +impl Context { + /// Execute the closure with the given scheduler core stored in the + /// thread-local context. + fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { + core.metrics.incr_poll_count(); + self.enter(core, || crate::coop::budget(f)) + } + + /// Blocks the current thread until an event is received by the driver, + /// including I/O events, timer events, ... + fn park(&self, mut core: Box<Core>) -> Box<Core> { + let mut driver = core.driver.take().expect("driver missing"); + + if let Some(f) = &self.spawner.shared.before_park { + // Incorrect lint, the closures are actually different types so `f` + // cannot be passed as an argument to `enter`. + #[allow(clippy::redundant_closure)] + let (c, _) = self.enter(core, || f()); + core = c; + } + + // This check will fail if `before_park` spawns a task for us to run + // instead of parking the thread + if core.tasks.is_empty() { + // Park until the thread is signaled + core.metrics.about_to_park(); + core.metrics.submit(&core.spawner.shared.worker_metrics); + + let (c, _) = self.enter(core, || { + driver.park().expect("failed to park"); + }); + + core = c; + core.metrics.returned_from_park(); + } + + if let Some(f) = &self.spawner.shared.after_unpark { + // Incorrect lint, the closures are actually different types so `f` + // cannot be passed as an argument to `enter`. + #[allow(clippy::redundant_closure)] + let (c, _) = self.enter(core, || f()); + core = c; + } + + core.driver = Some(driver); + core + } + + /// Checks the driver for new events without blocking the thread. + fn park_yield(&self, mut core: Box<Core>) -> Box<Core> { + let mut driver = core.driver.take().expect("driver missing"); + + core.metrics.submit(&core.spawner.shared.worker_metrics); + let (mut core, _) = self.enter(core, || { + driver + .park_timeout(Duration::from_millis(0)) + .expect("failed to park"); + }); + + core.driver = Some(driver); + core + } + + fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { + // Store the scheduler core in the thread-local context + // + // A drop-guard is employed at a higher level. + *self.core.borrow_mut() = Some(core); + + // Execute the closure while tracking the execution budget + let ret = f(); + + // Take the scheduler core back + let core = self.core.borrow_mut().take().expect("core missing"); + (core, ret) + } +} + +// ===== impl Spawner ===== + +impl Spawner { + /// Spawns a future onto the basic scheduler + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: crate::future::Future + Send + 'static, + F::Output: Send + 'static, + { + let (handle, notified) = self.shared.owned.bind(future, self.shared.clone()); + + if let Some(notified) = notified { + self.shared.schedule(notified); + } + + handle + } + + fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { + match self.shared.queue.lock().as_mut() { + Some(queue) => queue.pop_front(), + None => None, + } + } + + fn waker_ref(&self) -> WakerRef<'_> { + // Set woken to true when enter block_on, ensure outer future + // be polled for the first time when enter loop + self.shared.woken.store(true, Release); + waker_ref(&self.shared) + } + + // reset woken to false and return original value + pub(crate) fn reset_woken(&self) -> bool { + self.shared.woken.swap(false, AcqRel) + } +} + +cfg_metrics! { + impl Spawner { + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + // TODO: avoid having to lock. The multi-threaded injection queue + // could probably be used here. + self.shared.queue.lock() + .as_ref() + .map(|queue| queue.len()) + .unwrap_or(0) + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + assert_eq!(0, worker); + &self.shared.worker_metrics + } + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() + } +} + +// ===== impl Shared ===== + +impl Schedule for Arc<Shared> { + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + self.owned.remove(task) + } + + fn schedule(&self, task: task::Notified<Self>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + let mut core = cx.core.borrow_mut(); + + // If `None`, the runtime is shutting down, so there is no need + // to schedule the task. + if let Some(core) = core.as_mut() { + core.push_task(task); + } + } + _ => { + // Track that a task was scheduled from **outside** of the runtime. + self.scheduler_metrics.inc_remote_schedule_count(); + + // If the queue is None, then the runtime has shut down. We + // don't need to do anything with the notification in that case. + let mut guard = self.queue.lock(); + if let Some(queue) = guard.as_mut() { + queue.push_back(task); + drop(guard); + self.unpark.unpark(); + } + } + }); + } +} + +impl Wake for Shared { + fn wake(arc_self: Arc<Self>) { + Wake::wake_by_ref(&arc_self) + } + + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.woken.store(true, Release); + arc_self.unpark.unpark(); + } +} + +// ===== CoreGuard ===== + +/// Used to ensure we always place the `Core` value back into its slot in +/// `BasicScheduler`, even if the future panics. +struct CoreGuard<'a> { + context: Context, + basic_scheduler: &'a BasicScheduler, +} + +impl CoreGuard<'_> { + fn block_on<F: Future>(self, future: F) -> F::Output { + self.enter(|mut core, context| { + let _enter = crate::runtime::enter(false); + let waker = context.spawner.waker_ref(); + let mut cx = std::task::Context::from_waker(&waker); + + pin!(future); + + 'outer: loop { + if core.spawner.reset_woken() { + let (c, res) = context.enter(core, || { + crate::coop::budget(|| future.as_mut().poll(&mut cx)) + }); + + core = c; + + if let Ready(v) = res { + return (core, v); + } + } + + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = core.tick; + core.tick = core.tick.wrapping_add(1); + + let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + core.spawner.pop().or_else(|| core.tasks.pop_front()) + } else { + core.tasks.pop_front().or_else(|| core.spawner.pop()) + }; + + let task = match entry { + Some(entry) => entry, + None => { + core = context.park(core); + + // Try polling the `block_on` future next + continue 'outer; + } + }; + + let task = context.spawner.shared.owned.assert_owner(task); + + let (c, _) = context.run_task(core, || { + task.run(); + }); + + core = c; + } + + // Yield to the driver, this drives the timer and pulls any + // pending I/O events. + core = context.park_yield(core); + } + }) + } + + /// Enters the scheduler context. This sets the queue and other necessary + /// scheduler state in the thread-local. + fn enter<F, R>(self, f: F) -> R + where + F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R), + { + // Remove `core` from `context` to pass into the closure. + let core = self.context.core.borrow_mut().take().expect("core missing"); + + // Call the closure and place `core` back + let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context)); + + *self.context.core.borrow_mut() = Some(core); + + ret + } +} + +impl Drop for CoreGuard<'_> { + fn drop(&mut self) { + if let Some(core) = self.context.core.borrow_mut().take() { + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + self.basic_scheduler.core.set(core); + + // Wake up other possible threads that could steal the driver. + self.basic_scheduler.notify.notify_one() + } + } +} |