diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/basic_scheduler.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/basic_scheduler.rs | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/basic_scheduler.rs b/third_party/rust/tokio/src/runtime/basic_scheduler.rs new file mode 100644 index 0000000000..301554280f --- /dev/null +++ b/third_party/rust/tokio/src/runtime/basic_scheduler.rs @@ -0,0 +1,326 @@ +use crate::park::{Park, Unpark}; +use crate::runtime; +use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::{waker_ref, Wake}; + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::task::Poll::Ready; +use std::time::Duration; + +/// Executes tasks on the current thread +pub(crate) struct BasicScheduler<P> +where + P: Park, +{ + /// Scheduler run queue + /// + /// When the scheduler is executed, the queue is removed from `self` and + /// moved into `Context`. + /// + /// This indirection is to allow `BasicScheduler` to be `Send`. + tasks: Option<Tasks>, + + /// Sendable task spawner + spawner: Spawner, + + /// Current tick + tick: u8, + + /// Thread park handle + park: P, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + shared: Arc<Shared>, +} + +struct Tasks { + /// Collection of all active tasks spawned onto this executor. + owned: LinkedList<Task<Arc<Shared>>>, + + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + queue: VecDeque<task::Notified<Arc<Shared>>>, +} + +/// Scheduler state shared between threads. +struct Shared { + /// Remote run queue + queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, + + /// Unpark the blocked thread + unpark: Box<dyn Unpark>, +} + +/// Thread-local context +struct Context { + /// Shared scheduler state + shared: Arc<Shared>, + + /// Local queue + tasks: RefCell<Tasks>, +} + +/// Initial queue capacity +const INITIAL_CAPACITY: usize = 64; + +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often ot check the remote queue first +const REMOTE_FIRST_INTERVAL: u8 = 31; + +// Tracks the current BasicScheduler +scoped_thread_local!(static CURRENT: Context); + +impl<P> BasicScheduler<P> +where + P: Park, +{ + pub(crate) fn new(park: P) -> BasicScheduler<P> { + let unpark = Box::new(park.unpark()); + + BasicScheduler { + tasks: Some(Tasks { + owned: LinkedList::new(), + queue: VecDeque::with_capacity(INITIAL_CAPACITY), + }), + spawner: Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }, + tick: 0, + park, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + + pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output + where + F: Future, + { + enter(self, |scheduler, context| { + let _enter = runtime::enter(); + let waker = waker_ref(&scheduler.spawner.shared); + let mut cx = std::task::Context::from_waker(&waker); + + pin!(future); + + 'outer: loop { + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + return v; + } + + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = scheduler.tick; + scheduler.tick = scheduler.tick.wrapping_add(1); + + let next = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler + .spawner + .pop() + .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + } else { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .or_else(|| scheduler.spawner.pop()) + }; + + match next { + Some(task) => crate::coop::budget(|| task.run()), + None => { + // Park until the thread is signaled + scheduler.park.park().ok().expect("failed to park"); + + // Try polling the `block_on` future next + continue 'outer; + } + } + } + + // Yield to the park, this drives the timer and pulls any pending + // I/O events. + scheduler + .park + .park_timeout(Duration::from_millis(0)) + .ok() + .expect("failed to park"); + } + }) + } +} + +/// Enter the scheduler context. This sets the queue and other necessary +/// scheduler state in the thread-local +fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +where + F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + P: Park, +{ + // Ensures the run queue is placed back in the `BasicScheduler` instance + // once `block_on` returns.` + struct Guard<'a, P: Park> { + context: Option<Context>, + scheduler: &'a mut BasicScheduler<P>, + } + + impl<P: Park> Drop for Guard<'_, P> { + fn drop(&mut self) { + let Context { tasks, .. } = self.context.take().expect("context missing"); + self.scheduler.tasks = Some(tasks.into_inner()); + } + } + + // Remove `tasks` from `self` and place it in a `Context`. + let tasks = scheduler.tasks.take().expect("invalid state"); + + let guard = Guard { + context: Some(Context { + shared: scheduler.spawner.shared.clone(), + tasks: RefCell::new(tasks), + }), + scheduler, + }; + + let context = guard.context.as_ref().unwrap(); + let scheduler = &mut *guard.scheduler; + + CURRENT.set(context, || f(scheduler, context)) +} + +impl<P> Drop for BasicScheduler<P> +where + P: Park, +{ + fn drop(&mut self) { + enter(self, |scheduler, context| { + // Loop required here to ensure borrow is dropped between iterations + #[allow(clippy::while_let_loop)] + loop { + let task = match context.tasks.borrow_mut().owned.pop_back() { + Some(task) => task, + None => break, + }; + + task.shutdown(); + } + + // Drain local queue + for task in context.tasks.borrow_mut().queue.drain(..) { + task.shutdown(); + } + + // Drain remote queue + for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + task.shutdown(); + } + + assert!(context.tasks.borrow().owned.is_empty()); + }); + } +} + +impl<P: Park> fmt::Debug for BasicScheduler<P> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() + } +} + +// ===== impl Spawner ===== + +impl Spawner { + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.shared.schedule(task); + handle + } + + fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { + self.shared.queue.lock().unwrap().pop_front() + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() + } +} + +// ===== impl Shared ===== + +impl Schedule for Arc<Shared> { + fn bind(task: Task<Self>) -> Arc<Shared> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.clone() + }) + } + + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + use std::ptr::NonNull; + + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + cx.tasks.borrow_mut().owned.remove(ptr) + } + }) + } + + fn schedule(&self, task: task::Notified<Self>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.tasks.borrow_mut().queue.push_back(task); + } + _ => { + self.queue.lock().unwrap().push_back(task); + self.unpark.unpark(); + } + }); + } +} + +impl Wake for Shared { + fn wake(self: Arc<Self>) { + Wake::wake_by_ref(&self) + } + + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.unpark.unpark(); + } +} |