summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/basic_scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/src/runtime/basic_scheduler.rs')
-rw-r--r--vendor/tokio/src/runtime/basic_scheduler.rs534
1 files changed, 534 insertions, 0 deletions
diff --git a/vendor/tokio/src/runtime/basic_scheduler.rs b/vendor/tokio/src/runtime/basic_scheduler.rs
new file mode 100644
index 000000000..13dfb6973
--- /dev/null
+++ b/vendor/tokio/src/runtime/basic_scheduler.rs
@@ -0,0 +1,534 @@
+use crate::future::poll_fn;
+use crate::loom::sync::atomic::AtomicBool;
+use crate::loom::sync::Mutex;
+use crate::park::{Park, Unpark};
+use crate::runtime::task::{self, JoinHandle, Schedule, Task};
+use crate::sync::notify::Notify;
+use crate::util::linked_list::{Link, LinkedList};
+use crate::util::{waker_ref, Wake, WakerRef};
+
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::ptr::NonNull;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+use std::time::Duration;
+
+/// Executes tasks on the current thread
+pub(crate) struct BasicScheduler<P: Park> {
+ /// Inner state guarded by a mutex that is shared
+ /// between all `block_on` calls.
+ inner: Mutex<Option<Inner<P>>>,
+
+ /// Notifier for waking up other threads to steal the
+ /// parker.
+ notify: Notify,
+
+ /// Sendable task spawner
+ spawner: Spawner,
+}
+
+/// The inner scheduler that owns the task queue and the main parker P.
+struct Inner<P: Park> {
+ /// Scheduler run queue
+ ///
+ /// When the scheduler is executed, the queue is removed from `self` and
+ /// moved into `Context`.
+ ///
+ /// This indirection is to allow `BasicScheduler` to be `Send`.
+ tasks: Option<Tasks>,
+
+ /// Sendable task spawner
+ spawner: Spawner,
+
+ /// Current tick
+ tick: u8,
+
+ /// Thread park handle
+ park: P,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ shared: Arc<Shared>,
+}
+
+struct Tasks {
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LinkedList<Task<Arc<Shared>>, <Task<Arc<Shared>> as Link>::Target>,
+
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ queue: VecDeque<task::Notified<Arc<Shared>>>,
+}
+
+/// A remote scheduler entry.
+///
+/// These are filled in by remote threads sending instructions to the scheduler.
+enum Entry {
+ /// A remote thread wants to spawn a task.
+ Schedule(task::Notified<Arc<Shared>>),
+ /// A remote thread wants a task to be released by the scheduler. We only
+ /// have access to its header.
+ Release(NonNull<task::Header>),
+}
+
+// Safety: Used correctly, the task header is "thread safe". Ultimately the task
+// is owned by the current thread executor, for which this instruction is being
+// sent.
+unsafe impl Send for Entry {}
+
+/// Scheduler state shared between threads.
+struct Shared {
+ /// Remote run queue. None if the `Runtime` has been dropped.
+ queue: Mutex<Option<VecDeque<Entry>>>,
+
+ /// Unpark the blocked thread.
+ unpark: Box<dyn Unpark>,
+
+ /// Indicates whether the blocked on thread was woken.
+ woken: AtomicBool,
+}
+
+/// Thread-local context.
+struct Context {
+ /// Shared scheduler state
+ shared: Arc<Shared>,
+
+ /// Local queue
+ tasks: RefCell<Tasks>,
+}
+
+/// Initial queue capacity.
+const INITIAL_CAPACITY: usize = 64;
+
+/// Max number of tasks to poll per tick.
+#[cfg(loom)]
+const MAX_TASKS_PER_TICK: usize = 4;
+#[cfg(not(loom))]
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often to check the remote queue first.
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+// Tracks the current BasicScheduler.
+scoped_thread_local!(static CURRENT: Context);
+
+impl<P: Park> BasicScheduler<P> {
+ pub(crate) fn new(park: P) -> BasicScheduler<P> {
+ let unpark = Box::new(park.unpark());
+
+ let spawner = Spawner {
+ shared: Arc::new(Shared {
+ queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
+ unpark: unpark as Box<dyn Unpark>,
+ woken: AtomicBool::new(false),
+ }),
+ };
+
+ let inner = Mutex::new(Some(Inner {
+ tasks: Some(Tasks {
+ owned: LinkedList::new(),
+ queue: VecDeque::with_capacity(INITIAL_CAPACITY),
+ }),
+ spawner: spawner.clone(),
+ tick: 0,
+ park,
+ }));
+
+ BasicScheduler {
+ inner,
+ notify: Notify::new(),
+ spawner,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
+ pin!(future);
+
+ // Attempt to steal the dedicated parker and block_on the future if we can there,
+ // otherwise, lets select on a notification that the parker is available
+ // or the future is complete.
+ loop {
+ if let Some(inner) = &mut self.take_inner() {
+ return inner.block_on(future);
+ } else {
+ let mut enter = crate::runtime::enter(false);
+
+ let notified = self.notify.notified();
+ pin!(notified);
+
+ if let Some(out) = enter
+ .block_on(poll_fn(|cx| {
+ if notified.as_mut().poll(cx).is_ready() {
+ return Ready(None);
+ }
+
+ if let Ready(out) = future.as_mut().poll(cx) {
+ return Ready(Some(out));
+ }
+
+ Pending
+ }))
+ .expect("Failed to `Enter::block_on`")
+ {
+ return out;
+ }
+ }
+ }
+ }
+
+ fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
+ let inner = self.inner.lock().take()?;
+
+ Some(InnerGuard {
+ inner: Some(inner),
+ basic_scheduler: &self,
+ })
+ }
+}
+
+impl<P: Park> Inner<P> {
+ /// Block on the future provided and drive the runtime's driver.
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
+ enter(self, |scheduler, context| {
+ let _enter = crate::runtime::enter(false);
+ let waker = scheduler.spawner.waker_ref();
+ let mut cx = std::task::Context::from_waker(&waker);
+ let mut polled = false;
+
+ pin!(future);
+
+ 'outer: loop {
+ if scheduler.spawner.was_woken() || !polled {
+ polled = true;
+ if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
+ return v;
+ }
+ }
+
+ for _ in 0..MAX_TASKS_PER_TICK {
+ // Get and increment the current tick
+ let tick = scheduler.tick;
+ scheduler.tick = scheduler.tick.wrapping_add(1);
+
+ let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
+ scheduler.spawner.pop().or_else(|| {
+ context
+ .tasks
+ .borrow_mut()
+ .queue
+ .pop_front()
+ .map(Entry::Schedule)
+ })
+ } else {
+ context
+ .tasks
+ .borrow_mut()
+ .queue
+ .pop_front()
+ .map(Entry::Schedule)
+ .or_else(|| scheduler.spawner.pop())
+ };
+
+ let entry = match entry {
+ Some(entry) => entry,
+ None => {
+ // Park until the thread is signaled
+ scheduler.park.park().expect("failed to park");
+
+ // Try polling the `block_on` future next
+ continue 'outer;
+ }
+ };
+
+ match entry {
+ Entry::Schedule(task) => crate::coop::budget(|| task.run()),
+ Entry::Release(ptr) => {
+ // Safety: the task header is only legally provided
+ // internally in the header, so we know that it is a
+ // valid (or in particular *allocated*) header that
+ // is part of the linked list.
+ unsafe {
+ let removed = context.tasks.borrow_mut().owned.remove(ptr);
+
+ // TODO: This seems like it should hold, because
+ // there doesn't seem to be an avenue for anyone
+ // else to fiddle with the owned tasks
+ // collection *after* a remote thread has marked
+ // it as released, and at that point, the only
+ // location at which it can be removed is here
+ // or in the Drop implementation of the
+ // scheduler.
+ debug_assert!(removed.is_some());
+ }
+ }
+ }
+ }
+
+ // Yield to the park, this drives the timer and pulls any pending
+ // I/O events.
+ scheduler
+ .park
+ .park_timeout(Duration::from_millis(0))
+ .expect("failed to park");
+ }
+ })
+ }
+}
+
+/// Enter the scheduler context. This sets the queue and other necessary
+/// scheduler state in the thread-local
+fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
+where
+ F: FnOnce(&mut Inner<P>, &Context) -> R,
+ P: Park,
+{
+ // Ensures the run queue is placed back in the `BasicScheduler` instance
+ // once `block_on` returns.`
+ struct Guard<'a, P: Park> {
+ context: Option<Context>,
+ scheduler: &'a mut Inner<P>,
+ }
+
+ impl<P: Park> Drop for Guard<'_, P> {
+ fn drop(&mut self) {
+ let Context { tasks, .. } = self.context.take().expect("context missing");
+ self.scheduler.tasks = Some(tasks.into_inner());
+ }
+ }
+
+ // Remove `tasks` from `self` and place it in a `Context`.
+ let tasks = scheduler.tasks.take().expect("invalid state");
+
+ let guard = Guard {
+ context: Some(Context {
+ shared: scheduler.spawner.shared.clone(),
+ tasks: RefCell::new(tasks),
+ }),
+ scheduler,
+ };
+
+ let context = guard.context.as_ref().unwrap();
+ let scheduler = &mut *guard.scheduler;
+
+ CURRENT.set(context, || f(scheduler, context))
+}
+
+impl<P: Park> Drop for BasicScheduler<P> {
+ fn drop(&mut self) {
+ // Avoid a double panic if we are currently panicking and
+ // the lock may be poisoned.
+
+ let mut inner = match self.inner.lock().take() {
+ Some(inner) => inner,
+ None if std::thread::panicking() => return,
+ None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
+ };
+
+ enter(&mut inner, |scheduler, context| {
+ // Loop required here to ensure borrow is dropped between iterations
+ #[allow(clippy::while_let_loop)]
+ loop {
+ let task = match context.tasks.borrow_mut().owned.pop_back() {
+ Some(task) => task,
+ None => break,
+ };
+
+ task.shutdown();
+ }
+
+ // Drain local queue
+ for task in context.tasks.borrow_mut().queue.drain(..) {
+ task.shutdown();
+ }
+
+ // Drain remote queue and set it to None
+ let mut remote_queue = scheduler.spawner.shared.queue.lock();
+
+ // Using `Option::take` to replace the shared queue with `None`.
+ if let Some(remote_queue) = remote_queue.take() {
+ for entry in remote_queue {
+ match entry {
+ Entry::Schedule(task) => {
+ task.shutdown();
+ }
+ Entry::Release(..) => {
+ // Do nothing, each entry in the linked list was *just*
+ // dropped by the scheduler above.
+ }
+ }
+ }
+ }
+ // By dropping the mutex lock after the full duration of the above loop,
+ // any thread that sees the queue in the `None` state is guaranteed that
+ // the runtime has fully shut down.
+ //
+ // The assert below is unrelated to this mutex.
+ drop(remote_queue);
+
+ assert!(context.tasks.borrow().owned.is_empty());
+ });
+ }
+}
+
+impl<P: Park> fmt::Debug for BasicScheduler<P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BasicScheduler").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: crate::future::Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.shared.schedule(task);
+ handle
+ }
+
+ fn pop(&self) -> Option<Entry> {
+ match self.shared.queue.lock().as_mut() {
+ Some(queue) => queue.pop_front(),
+ None => None,
+ }
+ }
+
+ fn waker_ref(&self) -> WakerRef<'_> {
+ // clear the woken bit
+ self.shared.woken.swap(false, AcqRel);
+ waker_ref(&self.shared)
+ }
+
+ fn was_woken(&self) -> bool {
+ self.shared.woken.load(Acquire)
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Spawner").finish()
+ }
+}
+
+// ===== impl Shared =====
+
+impl Schedule for Arc<Shared> {
+ fn bind(task: Task<Self>) -> Arc<Shared> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+ cx.tasks.borrow_mut().owned.push_front(task);
+ cx.shared.clone()
+ })
+ }
+
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ CURRENT.with(|maybe_cx| {
+ let ptr = NonNull::from(task.header());
+
+ if let Some(cx) = maybe_cx {
+ // safety: the task is inserted in the list in `bind`.
+ unsafe { cx.tasks.borrow_mut().owned.remove(ptr) }
+ } else {
+ // By sending an `Entry::Release` to the runtime, we ask the
+ // runtime to remove this task from the linked list in
+ // `Tasks::owned`.
+ //
+ // If the queue is `None`, then the task was already removed
+ // from that list in the destructor of `BasicScheduler`. We do
+ // not do anything in this case for the same reason that
+ // `Entry::Release` messages are ignored in the remote queue
+ // drain loop of `BasicScheduler`'s destructor.
+ if let Some(queue) = self.queue.lock().as_mut() {
+ queue.push_back(Entry::Release(ptr));
+ }
+
+ self.unpark.unpark();
+ // Returning `None` here prevents the task plumbing from being
+ // freed. It is then up to the scheduler through the queue we
+ // just added to, or its Drop impl to free the task.
+ None
+ }
+ })
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ CURRENT.with(|maybe_cx| match maybe_cx {
+ Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
+ cx.tasks.borrow_mut().queue.push_back(task);
+ }
+ _ => {
+ let mut guard = self.queue.lock();
+ if let Some(queue) = guard.as_mut() {
+ queue.push_back(Entry::Schedule(task));
+ drop(guard);
+ self.unpark.unpark();
+ } else {
+ // The runtime has shut down. We drop the new task
+ // immediately.
+ drop(guard);
+ task.shutdown();
+ }
+ }
+ });
+ }
+}
+
+impl Wake for Shared {
+ fn wake(self: Arc<Self>) {
+ Wake::wake_by_ref(&self)
+ }
+
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.woken.store(true, Release);
+ arc_self.unpark.unpark();
+ }
+}
+
+// ===== InnerGuard =====
+
+/// Used to ensure we always place the Inner value
+/// back into its slot in `BasicScheduler`, even if the
+/// future panics.
+struct InnerGuard<'a, P: Park> {
+ inner: Option<Inner<P>>,
+ basic_scheduler: &'a BasicScheduler<P>,
+}
+
+impl<P: Park> InnerGuard<'_, P> {
+ fn block_on<F: Future>(&mut self, future: F) -> F::Output {
+ // The only time inner gets set to `None` is if we have dropped
+ // already so this unwrap is safe.
+ self.inner.as_mut().unwrap().block_on(future)
+ }
+}
+
+impl<P: Park> Drop for InnerGuard<'_, P> {
+ fn drop(&mut self) {
+ if let Some(scheduler) = self.inner.take() {
+ let mut lock = self.basic_scheduler.inner.lock();
+
+ // Replace old scheduler back into the state to allow
+ // other threads to pick it up and drive it.
+ lock.replace(scheduler);
+
+ // Wake up other possible threads that could steal
+ // the dedicated parker P.
+ self.basic_scheduler.notify.notify_one()
+ }
+ }
+}