use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; use crate::sync::notify::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread pub(crate) struct BasicScheduler { /// Inner state guarded by a mutex that is shared /// between all `block_on` calls. inner: Mutex>>, /// Notifier for waking up other threads to steal the /// parker. notify: Notify, /// Sendable task spawner spawner: Spawner, } /// The inner scheduler that owns the task queue and the main parker P. struct Inner { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and /// moved into `Context`. /// /// This indirection is to allow `BasicScheduler` to be `Send`. tasks: Option, /// Sendable task spawner spawner: Spawner, /// Current tick tick: u8, /// Thread park handle park: P, } #[derive(Clone)] pub(crate) struct Spawner { shared: Arc, } struct Tasks { /// Collection of all active tasks spawned onto this executor. owned: LinkedList>, > as Link>::Target>, /// Local run queue. /// /// Tasks notified from the current thread are pushed into this queue. queue: VecDeque>>, } /// A remote scheduler entry. /// /// These are filled in by remote threads sending instructions to the scheduler. enum Entry { /// A remote thread wants to spawn a task. Schedule(task::Notified>), /// A remote thread wants a task to be released by the scheduler. We only /// have access to its header. Release(NonNull), } // Safety: Used correctly, the task header is "thread safe". Ultimately the task // is owned by the current thread executor, for which this instruction is being // sent. unsafe impl Send for Entry {} /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. queue: Mutex>>, /// Unpark the blocked thread. unpark: Box, /// Indicates whether the blocked on thread was woken. woken: AtomicBool, } /// Thread-local context. struct Context { /// Shared scheduler state shared: Arc, /// Local queue tasks: RefCell, } /// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. #[cfg(loom)] const MAX_TASKS_PER_TICK: usize = 4; #[cfg(not(loom))] const MAX_TASKS_PER_TICK: usize = 61; /// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); impl BasicScheduler

{ pub(crate) fn new(park: P) -> BasicScheduler

{ let unpark = Box::new(park.unpark()); let spawner = Spawner { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), unpark: unpark as Box, woken: AtomicBool::new(false), }), }; let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), spawner: spawner.clone(), tick: 0, park, })); BasicScheduler { inner, notify: Notify::new(), spawner, } } pub(crate) fn spawner(&self) -> &Spawner { &self.spawner } pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); // Attempt to steal the dedicated parker and block_on the future if we can there, // otherwise, lets select on a notification that the parker is available // or the future is complete. loop { if let Some(inner) = &mut self.take_inner() { return inner.block_on(future); } else { let mut enter = crate::runtime::enter(false); let notified = self.notify.notified(); pin!(notified); if let Some(out) = enter .block_on(poll_fn(|cx| { if notified.as_mut().poll(cx).is_ready() { return Ready(None); } if let Ready(out) = future.as_mut().poll(cx) { return Ready(Some(out)); } Pending })) .expect("Failed to `Enter::block_on`") { return out; } } } } fn take_inner(&self) -> Option> { let inner = self.inner.lock().take()?; Some(InnerGuard { inner: Some(inner), basic_scheduler: &self, }) } } impl Inner

{ /// Block on the future provided and drive the runtime's driver. fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); let mut polled = false; pin!(future); 'outer: loop { if scheduler.spawner.was_woken() || !polled { polled = true; if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; } } for _ in 0..MAX_TASKS_PER_TICK { // Get and increment the current tick let tick = scheduler.tick; scheduler.tick = scheduler.tick.wrapping_add(1); let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { scheduler.spawner.pop().or_else(|| { context .tasks .borrow_mut() .queue .pop_front() .map(Entry::Schedule) }) } else { context .tasks .borrow_mut() .queue .pop_front() .map(Entry::Schedule) .or_else(|| scheduler.spawner.pop()) }; let entry = match entry { Some(entry) => entry, None => { // Park until the thread is signaled scheduler.park.park().expect("failed to park"); // Try polling the `block_on` future next continue 'outer; } }; match entry { Entry::Schedule(task) => crate::coop::budget(|| task.run()), Entry::Release(ptr) => { // Safety: the task header is only legally provided // internally in the header, so we know that it is a // valid (or in particular *allocated*) header that // is part of the linked list. unsafe { let removed = context.tasks.borrow_mut().owned.remove(ptr); // TODO: This seems like it should hold, because // there doesn't seem to be an avenue for anyone // else to fiddle with the owned tasks // collection *after* a remote thread has marked // it as released, and at that point, the only // location at which it can be removed is here // or in the Drop implementation of the // scheduler. debug_assert!(removed.is_some()); } } } } // Yield to the park, this drives the timer and pulls any pending // I/O events. scheduler .park .park_timeout(Duration::from_millis(0)) .expect("failed to park"); } }) } } /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local fn enter(scheduler: &mut Inner

, f: F) -> R where F: FnOnce(&mut Inner

, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option, scheduler: &'a mut Inner

, } impl Drop for Guard<'_, P> { fn drop(&mut self) { let Context { tasks, .. } = self.context.take().expect("context missing"); self.scheduler.tasks = Some(tasks.into_inner()); } } // Remove `tasks` from `self` and place it in a `Context`. let tasks = scheduler.tasks.take().expect("invalid state"); let guard = Guard { context: Some(Context { shared: scheduler.spawner.shared.clone(), tasks: RefCell::new(tasks), }), scheduler, }; let context = guard.context.as_ref().unwrap(); let scheduler = &mut *guard.scheduler; CURRENT.set(context, || f(scheduler, context)) } impl Drop for BasicScheduler

{ fn drop(&mut self) { // Avoid a double panic if we are currently panicking and // the lock may be poisoned. let mut inner = match self.inner.lock().take() { Some(inner) => inner, None if std::thread::panicking() => return, None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), }; enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { let task = match context.tasks.borrow_mut().owned.pop_back() { Some(task) => task, None => break, }; task.shutdown(); } // Drain local queue for task in context.tasks.borrow_mut().queue.drain(..) { task.shutdown(); } // Drain remote queue and set it to None let mut remote_queue = scheduler.spawner.shared.queue.lock(); // Using `Option::take` to replace the shared queue with `None`. if let Some(remote_queue) = remote_queue.take() { for entry in remote_queue { match entry { Entry::Schedule(task) => { task.shutdown(); } Entry::Release(..) => { // Do nothing, each entry in the linked list was *just* // dropped by the scheduler above. } } } } // By dropping the mutex lock after the full duration of the above loop, // any thread that sees the queue in the `None` state is guaranteed that // the runtime has fully shut down. // // The assert below is unrelated to this mutex. drop(remote_queue); assert!(context.tasks.borrow().owned.is_empty()); }); } } impl fmt::Debug for BasicScheduler

{ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("BasicScheduler").finish() } } // ===== impl Spawner ===== impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle where F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { let (task, handle) = task::joinable(future); self.shared.schedule(task); handle } fn pop(&self) -> Option { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), None => None, } } fn waker_ref(&self) -> WakerRef<'_> { // clear the woken bit self.shared.woken.swap(false, AcqRel); waker_ref(&self.shared) } fn was_woken(&self) -> bool { self.shared.woken.load(Acquire) } } impl fmt::Debug for Spawner { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Spawner").finish() } } // ===== impl Shared ===== impl Schedule for Arc { fn bind(task: Task) -> Arc { CURRENT.with(|maybe_cx| { let cx = maybe_cx.expect("scheduler context missing"); cx.tasks.borrow_mut().owned.push_front(task); cx.shared.clone() }) } fn release(&self, task: &Task) -> Option> { CURRENT.with(|maybe_cx| { let ptr = NonNull::from(task.header()); if let Some(cx) = maybe_cx { // safety: the task is inserted in the list in `bind`. unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } } else { // By sending an `Entry::Release` to the runtime, we ask the // runtime to remove this task from the linked list in // `Tasks::owned`. // // If the queue is `None`, then the task was already removed // from that list in the destructor of `BasicScheduler`. We do // not do anything in this case for the same reason that // `Entry::Release` messages are ignored in the remote queue // drain loop of `BasicScheduler`'s destructor. if let Some(queue) = self.queue.lock().as_mut() { queue.push_back(Entry::Release(ptr)); } self.unpark.unpark(); // Returning `None` here prevents the task plumbing from being // freed. It is then up to the scheduler through the queue we // just added to, or its Drop impl to free the task. None } }) } fn schedule(&self, task: task::Notified) { CURRENT.with(|maybe_cx| match maybe_cx { Some(cx) if Arc::ptr_eq(self, &cx.shared) => { cx.tasks.borrow_mut().queue.push_back(task); } _ => { let mut guard = self.queue.lock(); if let Some(queue) = guard.as_mut() { queue.push_back(Entry::Schedule(task)); drop(guard); self.unpark.unpark(); } else { // The runtime has shut down. We drop the new task // immediately. drop(guard); task.shutdown(); } } }); } } impl Wake for Shared { fn wake(self: Arc) { Wake::wake_by_ref(&self) } /// Wake by reference fn wake_by_ref(arc_self: &Arc) { arc_self.woken.store(true, Release); arc_self.unpark.unpark(); } } // ===== InnerGuard ===== /// Used to ensure we always place the Inner value /// back into its slot in `BasicScheduler`, even if the /// future panics. struct InnerGuard<'a, P: Park> { inner: Option>, basic_scheduler: &'a BasicScheduler

, } impl InnerGuard<'_, P> { fn block_on(&mut self, future: F) -> F::Output { // The only time inner gets set to `None` is if we have dropped // already so this unwrap is safe. self.inner.as_mut().unwrap().block_on(future) } } impl Drop for InnerGuard<'_, P> { fn drop(&mut self) { if let Some(scheduler) = self.inner.take() { let mut lock = self.basic_scheduler.inner.lock(); // Replace old scheduler back into the state to allow // other threads to pick it up and drive it. lock.replace(scheduler); // Wake up other possible threads that could steal // the dedicated parker P. self.basic_scheduler.notify.notify_one() } } }