summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/runtime
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-28 14:29:10 +0000
commit2aa4a82499d4becd2284cdb482213d541b8804dd (patch)
treeb80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/runtime
parentInitial commit. (diff)
downloadfirefox-upstream.tar.xz
firefox-upstream.zip
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/runtime')
-rw-r--r--third_party/rust/tokio/src/runtime/basic_scheduler.rs326
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/mod.rs43
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/pool.rs307
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/schedule.rs24
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/shutdown.rs58
-rw-r--r--third_party/rust/tokio/src/runtime/blocking/task.rs40
-rw-r--r--third_party/rust/tokio/src/runtime/builder.rs519
-rw-r--r--third_party/rust/tokio/src/runtime/context.rs73
-rw-r--r--third_party/rust/tokio/src/runtime/enter.rs162
-rw-r--r--third_party/rust/tokio/src/runtime/handle.rs140
-rw-r--r--third_party/rust/tokio/src/runtime/io.rs63
-rw-r--r--third_party/rust/tokio/src/runtime/mod.rs494
-rw-r--r--third_party/rust/tokio/src/runtime/park.rs245
-rw-r--r--third_party/rust/tokio/src/runtime/queue.rs630
-rw-r--r--third_party/rust/tokio/src/runtime/shell.rs62
-rw-r--r--third_party/rust/tokio/src/runtime/spawner.rs37
-rw-r--r--third_party/rust/tokio/src/runtime/task/core.rs279
-rw-r--r--third_party/rust/tokio/src/runtime/task/error.rs163
-rw-r--r--third_party/rust/tokio/src/runtime/task/harness.rs372
-rw-r--r--third_party/rust/tokio/src/runtime/task/join.rs152
-rw-r--r--third_party/rust/tokio/src/runtime/task/mod.rs220
-rw-r--r--third_party/rust/tokio/src/runtime/task/raw.rs131
-rw-r--r--third_party/rust/tokio/src/runtime/task/stack.rs83
-rw-r--r--third_party/rust/tokio/src/runtime/task/state.rs446
-rw-r--r--third_party/rust/tokio/src/runtime/task/waker.rs101
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_blocking.rs31
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs49
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_pool.rs380
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_queue.rs216
-rw-r--r--third_party/rust/tokio/src/runtime/tests/mod.rs13
-rw-r--r--third_party/rust/tokio/src/runtime/tests/queue.rs202
-rw-r--r--third_party/rust/tokio/src/runtime/tests/task.rs159
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs52
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/idle.rs222
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/mod.rs117
-rw-r--r--third_party/rust/tokio/src/runtime/thread_pool/worker.rs761
-rw-r--r--third_party/rust/tokio/src/runtime/time.rs59
37 files changed, 7431 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/basic_scheduler.rs b/third_party/rust/tokio/src/runtime/basic_scheduler.rs
new file mode 100644
index 0000000000..301554280f
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/basic_scheduler.rs
@@ -0,0 +1,326 @@
+use crate::park::{Park, Unpark};
+use crate::runtime;
+use crate::runtime::task::{self, JoinHandle, Schedule, Task};
+use crate::util::linked_list::LinkedList;
+use crate::util::{waker_ref, Wake};
+
+use std::cell::RefCell;
+use std::collections::VecDeque;
+use std::fmt;
+use std::future::Future;
+use std::sync::{Arc, Mutex};
+use std::task::Poll::Ready;
+use std::time::Duration;
+
+/// Executes tasks on the current thread
+pub(crate) struct BasicScheduler<P>
+where
+ P: Park,
+{
+ /// Scheduler run queue
+ ///
+ /// When the scheduler is executed, the queue is removed from `self` and
+ /// moved into `Context`.
+ ///
+ /// This indirection is to allow `BasicScheduler` to be `Send`.
+ tasks: Option<Tasks>,
+
+ /// Sendable task spawner
+ spawner: Spawner,
+
+ /// Current tick
+ tick: u8,
+
+ /// Thread park handle
+ park: P,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ shared: Arc<Shared>,
+}
+
+struct Tasks {
+ /// Collection of all active tasks spawned onto this executor.
+ owned: LinkedList<Task<Arc<Shared>>>,
+
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ queue: VecDeque<task::Notified<Arc<Shared>>>,
+}
+
+/// Scheduler state shared between threads.
+struct Shared {
+ /// Remote run queue
+ queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>,
+
+ /// Unpark the blocked thread
+ unpark: Box<dyn Unpark>,
+}
+
+/// Thread-local context
+struct Context {
+ /// Shared scheduler state
+ shared: Arc<Shared>,
+
+ /// Local queue
+ tasks: RefCell<Tasks>,
+}
+
+/// Initial queue capacity
+const INITIAL_CAPACITY: usize = 64;
+
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
+
+/// How often ot check the remote queue first
+const REMOTE_FIRST_INTERVAL: u8 = 31;
+
+// Tracks the current BasicScheduler
+scoped_thread_local!(static CURRENT: Context);
+
+impl<P> BasicScheduler<P>
+where
+ P: Park,
+{
+ pub(crate) fn new(park: P) -> BasicScheduler<P> {
+ let unpark = Box::new(park.unpark());
+
+ BasicScheduler {
+ tasks: Some(Tasks {
+ owned: LinkedList::new(),
+ queue: VecDeque::with_capacity(INITIAL_CAPACITY),
+ }),
+ spawner: Spawner {
+ shared: Arc::new(Shared {
+ queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
+ unpark: unpark as Box<dyn Unpark>,
+ }),
+ },
+ tick: 0,
+ park,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.spawner.spawn(future)
+ }
+
+ pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ enter(self, |scheduler, context| {
+ let _enter = runtime::enter();
+ let waker = waker_ref(&scheduler.spawner.shared);
+ let mut cx = std::task::Context::from_waker(&waker);
+
+ pin!(future);
+
+ 'outer: loop {
+ if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
+ return v;
+ }
+
+ for _ in 0..MAX_TASKS_PER_TICK {
+ // Get and increment the current tick
+ let tick = scheduler.tick;
+ scheduler.tick = scheduler.tick.wrapping_add(1);
+
+ let next = if tick % REMOTE_FIRST_INTERVAL == 0 {
+ scheduler
+ .spawner
+ .pop()
+ .or_else(|| context.tasks.borrow_mut().queue.pop_front())
+ } else {
+ context
+ .tasks
+ .borrow_mut()
+ .queue
+ .pop_front()
+ .or_else(|| scheduler.spawner.pop())
+ };
+
+ match next {
+ Some(task) => crate::coop::budget(|| task.run()),
+ None => {
+ // Park until the thread is signaled
+ scheduler.park.park().ok().expect("failed to park");
+
+ // Try polling the `block_on` future next
+ continue 'outer;
+ }
+ }
+ }
+
+ // Yield to the park, this drives the timer and pulls any pending
+ // I/O events.
+ scheduler
+ .park
+ .park_timeout(Duration::from_millis(0))
+ .ok()
+ .expect("failed to park");
+ }
+ })
+ }
+}
+
+/// Enter the scheduler context. This sets the queue and other necessary
+/// scheduler state in the thread-local
+fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
+where
+ F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
+ P: Park,
+{
+ // Ensures the run queue is placed back in the `BasicScheduler` instance
+ // once `block_on` returns.`
+ struct Guard<'a, P: Park> {
+ context: Option<Context>,
+ scheduler: &'a mut BasicScheduler<P>,
+ }
+
+ impl<P: Park> Drop for Guard<'_, P> {
+ fn drop(&mut self) {
+ let Context { tasks, .. } = self.context.take().expect("context missing");
+ self.scheduler.tasks = Some(tasks.into_inner());
+ }
+ }
+
+ // Remove `tasks` from `self` and place it in a `Context`.
+ let tasks = scheduler.tasks.take().expect("invalid state");
+
+ let guard = Guard {
+ context: Some(Context {
+ shared: scheduler.spawner.shared.clone(),
+ tasks: RefCell::new(tasks),
+ }),
+ scheduler,
+ };
+
+ let context = guard.context.as_ref().unwrap();
+ let scheduler = &mut *guard.scheduler;
+
+ CURRENT.set(context, || f(scheduler, context))
+}
+
+impl<P> Drop for BasicScheduler<P>
+where
+ P: Park,
+{
+ fn drop(&mut self) {
+ enter(self, |scheduler, context| {
+ // Loop required here to ensure borrow is dropped between iterations
+ #[allow(clippy::while_let_loop)]
+ loop {
+ let task = match context.tasks.borrow_mut().owned.pop_back() {
+ Some(task) => task,
+ None => break,
+ };
+
+ task.shutdown();
+ }
+
+ // Drain local queue
+ for task in context.tasks.borrow_mut().queue.drain(..) {
+ task.shutdown();
+ }
+
+ // Drain remote queue
+ for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) {
+ task.shutdown();
+ }
+
+ assert!(context.tasks.borrow().owned.is_empty());
+ });
+ }
+}
+
+impl<P: Park> fmt::Debug for BasicScheduler<P> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BasicScheduler").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.shared.schedule(task);
+ handle
+ }
+
+ fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
+ self.shared.queue.lock().unwrap().pop_front()
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Spawner").finish()
+ }
+}
+
+// ===== impl Shared =====
+
+impl Schedule for Arc<Shared> {
+ fn bind(task: Task<Self>) -> Arc<Shared> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+ cx.tasks.borrow_mut().owned.push_front(task);
+ cx.shared.clone()
+ })
+ }
+
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ use std::ptr::NonNull;
+
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+
+ // safety: the task is inserted in the list in `bind`.
+ unsafe {
+ let ptr = NonNull::from(task.header());
+ cx.tasks.borrow_mut().owned.remove(ptr)
+ }
+ })
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ CURRENT.with(|maybe_cx| match maybe_cx {
+ Some(cx) if Arc::ptr_eq(self, &cx.shared) => {
+ cx.tasks.borrow_mut().queue.push_back(task);
+ }
+ _ => {
+ self.queue.lock().unwrap().push_back(task);
+ self.unpark.unpark();
+ }
+ });
+ }
+}
+
+impl Wake for Shared {
+ fn wake(self: Arc<Self>) {
+ Wake::wake_by_ref(&self)
+ }
+
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.unpark.unpark();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/mod.rs b/third_party/rust/tokio/src/runtime/blocking/mod.rs
new file mode 100644
index 0000000000..5c808335cc
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/mod.rs
@@ -0,0 +1,43 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking
+//! pool. When the `blocking` feature flag is **not** enabled, these APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+cfg_blocking_impl! {
+ mod pool;
+ pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner};
+
+ mod schedule;
+ mod shutdown;
+ mod task;
+
+ use crate::runtime::Builder;
+
+ pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ BlockingPool::new(builder, thread_cap)
+
+ }
+}
+
+cfg_not_blocking_impl! {
+ use crate::runtime::Builder;
+ use std::time::Duration;
+
+ #[derive(Debug, Clone)]
+ pub(crate) struct BlockingPool {}
+
+ pub(crate) use BlockingPool as Spawner;
+
+ pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
+ BlockingPool {}
+ }
+
+ impl BlockingPool {
+ pub(crate) fn spawner(&self) -> &BlockingPool {
+ self
+ }
+
+ pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/pool.rs b/third_party/rust/tokio/src/runtime/blocking/pool.rs
new file mode 100644
index 0000000000..a3b208d171
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/pool.rs
@@ -0,0 +1,307 @@
+//! Thread pool for blocking operations
+
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+use crate::runtime::blocking::schedule::NoopSchedule;
+use crate::runtime::blocking::shutdown;
+use crate::runtime::blocking::task::BlockingTask;
+use crate::runtime::task::{self, JoinHandle};
+use crate::runtime::{Builder, Callback, Handle};
+
+use std::collections::VecDeque;
+use std::fmt;
+use std::time::Duration;
+
+pub(crate) struct BlockingPool {
+ spawner: Spawner,
+ shutdown_rx: shutdown::Receiver,
+}
+
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// State shared between worker threads
+ shared: Mutex<Shared>,
+
+ /// Pool threads wait on this.
+ condvar: Condvar,
+
+ /// Spawned threads use this name
+ thread_name: String,
+
+ /// Spawned thread stack size
+ stack_size: Option<usize>,
+
+ /// Call after a thread starts
+ after_start: Option<Callback>,
+
+ /// Call before a thread stops
+ before_stop: Option<Callback>,
+
+ thread_cap: usize,
+}
+
+struct Shared {
+ queue: VecDeque<Task>,
+ num_th: usize,
+ num_idle: u32,
+ num_notify: u32,
+ shutdown: bool,
+ shutdown_tx: Option<shutdown::Sender>,
+}
+
+type Task = task::Notified<NoopSchedule>;
+
+const KEEP_ALIVE: Duration = Duration::from_secs(10);
+
+/// Run the provided function on an executor dedicated to blocking operations.
+pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ let rt = Handle::current();
+
+ let (task, handle) = task::joinable(BlockingTask::new(func));
+ let _ = rt.blocking_spawner.spawn(task, &rt);
+ handle
+}
+
+#[allow(dead_code)]
+pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
+where
+ F: FnOnce() -> R + Send + 'static,
+{
+ let rt = Handle::current();
+
+ let (task, _handle) = task::joinable(BlockingTask::new(func));
+ rt.blocking_spawner.spawn(task, &rt)
+}
+
+// ===== impl BlockingPool =====
+
+impl BlockingPool {
+ pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
+ let (shutdown_tx, shutdown_rx) = shutdown::channel();
+
+ BlockingPool {
+ spawner: Spawner {
+ inner: Arc::new(Inner {
+ shared: Mutex::new(Shared {
+ queue: VecDeque::new(),
+ num_th: 0,
+ num_idle: 0,
+ num_notify: 0,
+ shutdown: false,
+ shutdown_tx: Some(shutdown_tx),
+ }),
+ condvar: Condvar::new(),
+ thread_name: builder.thread_name.clone(),
+ stack_size: builder.thread_stack_size,
+ after_start: builder.after_start.clone(),
+ before_stop: builder.before_stop.clone(),
+ thread_cap,
+ }),
+ },
+ shutdown_rx,
+ }
+ }
+
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
+ let mut shared = self.spawner.inner.shared.lock().unwrap();
+
+ // The function can be called multiple times. First, by explicitly
+ // calling `shutdown` then by the drop handler calling `shutdown`. This
+ // prevents shutting down twice.
+ if shared.shutdown {
+ return;
+ }
+
+ shared.shutdown = true;
+ shared.shutdown_tx = None;
+ self.spawner.inner.condvar.notify_all();
+
+ drop(shared);
+
+ self.shutdown_rx.wait(timeout);
+ }
+}
+
+impl Drop for BlockingPool {
+ fn drop(&mut self) {
+ self.shutdown(None);
+ }
+}
+
+impl fmt::Debug for BlockingPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("BlockingPool").finish()
+ }
+}
+
+// ===== impl Spawner =====
+
+impl Spawner {
+ fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
+ let shutdown_tx = {
+ let mut shared = self.inner.shared.lock().unwrap();
+
+ if shared.shutdown {
+ // Shutdown the task
+ task.shutdown();
+
+ // no need to even push this task; it would never get picked up
+ return Err(());
+ }
+
+ shared.queue.push_back(task);
+
+ if shared.num_idle == 0 {
+ // No threads are able to process the task.
+
+ if shared.num_th == self.inner.thread_cap {
+ // At max number of threads
+ None
+ } else {
+ shared.num_th += 1;
+ assert!(shared.shutdown_tx.is_some());
+ shared.shutdown_tx.clone()
+ }
+ } else {
+ // Notify an idle worker thread. The notification counter
+ // is used to count the needed amount of notifications
+ // exactly. Thread libraries may generate spurious
+ // wakeups, this counter is used to keep us in a
+ // consistent state.
+ shared.num_idle -= 1;
+ shared.num_notify += 1;
+ self.inner.condvar.notify_one();
+ None
+ }
+ };
+
+ if let Some(shutdown_tx) = shutdown_tx {
+ self.spawn_thread(shutdown_tx, rt);
+ }
+
+ Ok(())
+ }
+
+ fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
+ let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
+
+ if let Some(stack_size) = self.inner.stack_size {
+ builder = builder.stack_size(stack_size);
+ }
+
+ let rt = rt.clone();
+
+ builder
+ .spawn(move || {
+ // Only the reference should be moved into the closure
+ let rt = &rt;
+ rt.enter(move || {
+ rt.blocking_spawner.inner.run();
+ drop(shutdown_tx);
+ })
+ })
+ .unwrap();
+ }
+}
+
+impl Inner {
+ fn run(&self) {
+ if let Some(f) = &self.after_start {
+ f()
+ }
+
+ let mut shared = self.shared.lock().unwrap();
+
+ 'main: loop {
+ // BUSY
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.run();
+
+ shared = self.shared.lock().unwrap();
+ }
+
+ // IDLE
+ shared.num_idle += 1;
+
+ while !shared.shutdown {
+ let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
+
+ shared = lock_result.0;
+ let timeout_result = lock_result.1;
+
+ if shared.num_notify != 0 {
+ // We have received a legitimate wakeup,
+ // acknowledge it by decrementing the counter
+ // and transition to the BUSY state.
+ shared.num_notify -= 1;
+ break;
+ }
+
+ // Even if the condvar "timed out", if the pool is entering the
+ // shutdown phase, we want to perform the cleanup logic.
+ if !shared.shutdown && timeout_result.timed_out() {
+ break 'main;
+ }
+
+ // Spurious wakeup detected, go back to sleep.
+ }
+
+ if shared.shutdown {
+ // Drain the queue
+ while let Some(task) = shared.queue.pop_front() {
+ drop(shared);
+ task.shutdown();
+
+ shared = self.shared.lock().unwrap();
+ }
+
+ // Work was produced, and we "took" it (by decrementing num_notify).
+ // This means that num_idle was decremented once for our wakeup.
+ // But, since we are exiting, we need to "undo" that, as we'll stay idle.
+ shared.num_idle += 1;
+ // NOTE: Technically we should also do num_notify++ and notify again,
+ // but since we're shutting down anyway, that won't be necessary.
+ break;
+ }
+ }
+
+ // Thread exit
+ shared.num_th -= 1;
+
+ // num_idle should now be tracked exactly, panic
+ // with a descriptive message if it is not the
+ // case.
+ shared.num_idle = shared
+ .num_idle
+ .checked_sub(1)
+ .expect("num_idle underflowed on thread exit");
+
+ if shared.shutdown && shared.num_th == 0 {
+ self.condvar.notify_one();
+ }
+
+ drop(shared);
+
+ if let Some(f) = &self.before_stop {
+ f()
+ }
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("blocking::Spawner").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/schedule.rs b/third_party/rust/tokio/src/runtime/blocking/schedule.rs
new file mode 100644
index 0000000000..e10778d530
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/schedule.rs
@@ -0,0 +1,24 @@
+use crate::runtime::task::{self, Task};
+
+/// `task::Schedule` implementation that does nothing. This is unique to the
+/// blocking scheduler as tasks scheduled are not really futures but blocking
+/// operations.
+///
+/// We avoid storing the task by forgetting it in `bind` and re-materializing it
+/// in `release.
+pub(super) struct NoopSchedule;
+
+impl task::Schedule for NoopSchedule {
+ fn bind(_task: Task<Self>) -> NoopSchedule {
+ // Do nothing w/ the task
+ NoopSchedule
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/shutdown.rs b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs
new file mode 100644
index 0000000000..5ee8af0fbc
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs
@@ -0,0 +1,58 @@
+//! A shutdown channel.
+//!
+//! Each worker holds the `Sender` half. When all the `Sender` halves are
+//! dropped, the `Receiver` receives a notification.
+
+use crate::loom::sync::Arc;
+use crate::sync::oneshot;
+
+use std::time::Duration;
+
+#[derive(Debug, Clone)]
+pub(super) struct Sender {
+ tx: Arc<oneshot::Sender<()>>,
+}
+
+#[derive(Debug)]
+pub(super) struct Receiver {
+ rx: oneshot::Receiver<()>,
+}
+
+pub(super) fn channel() -> (Sender, Receiver) {
+ let (tx, rx) = oneshot::channel();
+ let tx = Sender { tx: Arc::new(tx) };
+ let rx = Receiver { rx };
+
+ (tx, rx)
+}
+
+impl Receiver {
+ /// Blocks the current thread until all `Sender` handles drop.
+ ///
+ /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout`
+ /// duration. If `timeout` is `None`, then the thread is blocked until the
+ /// shutdown signal is received.
+ pub(crate) fn wait(&mut self, timeout: Option<Duration>) {
+ use crate::runtime::enter::{enter, try_enter};
+
+ let mut e = if std::thread::panicking() {
+ match try_enter() {
+ Some(enter) => enter,
+ _ => return,
+ }
+ } else {
+ enter()
+ };
+
+ // The oneshot completes with an Err
+ //
+ // If blocking fails to wait, this indicates a problem parking the
+ // current thread (usually, shutting down a runtime stored in a
+ // thread-local).
+ if let Some(timeout) = timeout {
+ let _ = e.block_on_timeout(&mut self.rx, timeout);
+ } else {
+ let _ = e.block_on(&mut self.rx);
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/blocking/task.rs b/third_party/rust/tokio/src/runtime/blocking/task.rs
new file mode 100644
index 0000000000..f98b85494c
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/blocking/task.rs
@@ -0,0 +1,40 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+/// Converts a function to a future that completes on poll
+pub(super) struct BlockingTask<T> {
+ func: Option<T>,
+}
+
+impl<T> BlockingTask<T> {
+ /// Initializes a new blocking task from the given function
+ pub(super) fn new(func: T) -> BlockingTask<T> {
+ BlockingTask { func: Some(func) }
+ }
+}
+
+impl<T, R> Future for BlockingTask<T>
+where
+ T: FnOnce() -> R,
+{
+ type Output = R;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> {
+ let me = unsafe { self.get_unchecked_mut() };
+ let func = me
+ .func
+ .take()
+ .expect("[internal exception] blocking task ran twice.");
+
+ // This is a little subtle:
+ // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted
+ // using coop. However, the way things are currently modeled, even running a blocking task
+ // currently goes through Task::poll(), and so is subject to budgeting. That isn't really
+ // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
+ // we want it to start without any budgeting.
+ crate::coop::stop();
+
+ Poll::Ready(func())
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/builder.rs b/third_party/rust/tokio/src/runtime/builder.rs
new file mode 100644
index 0000000000..cfde998251
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/builder.rs
@@ -0,0 +1,519 @@
+use crate::runtime::handle::Handle;
+use crate::runtime::shell::Shell;
+use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
+
+use std::fmt;
+#[cfg(not(loom))]
+use std::sync::Arc;
+
+/// Builds Tokio Runtime with custom configuration values.
+///
+/// Methods can be chained in order to set the configuration values. The
+/// Runtime is constructed by calling [`build`].
+///
+/// New instances of `Builder` are obtained via [`Builder::new`].
+///
+/// See function level documentation for details on the various configuration
+/// settings.
+///
+/// [`build`]: #method.build
+/// [`Builder::new`]: #method.new
+///
+/// # Examples
+///
+/// ```
+/// use tokio::runtime::Builder;
+///
+/// fn main() {
+/// // build runtime
+/// let runtime = Builder::new()
+/// .threaded_scheduler()
+/// .core_threads(4)
+/// .thread_name("my-custom-name")
+/// .thread_stack_size(3 * 1024 * 1024)
+/// .build()
+/// .unwrap();
+///
+/// // use runtime ...
+/// }
+/// ```
+pub struct Builder {
+ /// The task execution model to use.
+ kind: Kind,
+
+ /// Whether or not to enable the I/O driver
+ enable_io: bool,
+
+ /// Whether or not to enable the time driver
+ enable_time: bool,
+
+ /// The number of worker threads, used by Runtime.
+ ///
+ /// Only used when not using the current-thread executor.
+ core_threads: Option<usize>,
+
+ /// Cap on thread usage.
+ max_threads: usize,
+
+ /// Name used for threads spawned by the runtime.
+ pub(super) thread_name: String,
+
+ /// Stack size used for threads spawned by the runtime.
+ pub(super) thread_stack_size: Option<usize>,
+
+ /// Callback to run after each thread starts.
+ pub(super) after_start: Option<Callback>,
+
+ /// To run before each worker thread stops
+ pub(super) before_stop: Option<Callback>,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum Kind {
+ Shell,
+ #[cfg(feature = "rt-core")]
+ Basic,
+ #[cfg(feature = "rt-threaded")]
+ ThreadPool,
+}
+
+impl Builder {
+ /// Returns a new runtime builder initialized with default configuration
+ /// values.
+ ///
+ /// Configuration methods can be chained on the return value.
+ pub fn new() -> Builder {
+ Builder {
+ // No task execution by default
+ kind: Kind::Shell,
+
+ // I/O defaults to "off"
+ enable_io: false,
+
+ // Time defaults to "off"
+ enable_time: false,
+
+ // Default to lazy auto-detection (one thread per CPU core)
+ core_threads: None,
+
+ max_threads: 512,
+
+ // Default thread name
+ thread_name: "tokio-runtime-worker".into(),
+
+ // Do not set a stack size by default
+ thread_stack_size: None,
+
+ // No worker thread callbacks
+ after_start: None,
+ before_stop: None,
+ }
+ }
+
+ /// Enables both I/O and time drivers.
+ ///
+ /// Doing this is a shorthand for calling `enable_io` and `enable_time`
+ /// individually. If additional components are added to Tokio in the future,
+ /// `enable_all` will include these future components.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .enable_all()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_all(&mut self) -> &mut Self {
+ #[cfg(feature = "io-driver")]
+ self.enable_io();
+ #[cfg(feature = "time")]
+ self.enable_time();
+
+ self
+ }
+
+ #[deprecated(note = "In future will be replaced by core_threads method")]
+ /// Sets the maximum number of worker threads for the `Runtime`'s thread pool.
+ ///
+ /// This must be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ pub fn num_threads(&mut self, val: usize) -> &mut Self {
+ self.core_threads = Some(val);
+ self
+ }
+
+ /// Sets the core number of worker threads for the `Runtime`'s thread pool.
+ ///
+ /// This should be a number between 1 and 32,768 though it is advised to keep
+ /// this value on the smaller side.
+ ///
+ /// The default value is the number of cores available to the system.
+ ///
+ /// These threads will be always active and running.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .core_threads(4)
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn core_threads(&mut self, val: usize) -> &mut Self {
+ assert_ne!(val, 0, "Core threads cannot be zero");
+ self.core_threads = Some(val);
+ self
+ }
+
+ /// Specifies limit for threads, spawned by the Runtime.
+ ///
+ /// This is number of threads to be used by Runtime, including `core_threads`
+ /// Having `max_threads` less than `core_threads` results in invalid configuration
+ /// when building multi-threaded `Runtime`, which would cause a panic.
+ ///
+ /// Similarly to the `core_threads`, this number should be between 1 and 32,768.
+ ///
+ /// The default value is 512.
+ ///
+ /// When multi-threaded runtime is not used, will act as limit on additional threads.
+ ///
+ /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for
+ /// blocking annotations) as `max_threads - core_threads`.
+ pub fn max_threads(&mut self, val: usize) -> &mut Self {
+ assert_ne!(val, 0, "Thread limit cannot be zero");
+ self.max_threads = val;
+ self
+ }
+
+ /// Sets name of threads spawned by the `Runtime`'s thread pool.
+ ///
+ /// The default name is "tokio-runtime-worker".
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new()
+ /// .thread_name("my-pool")
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
+ self.thread_name = val.into();
+ self
+ }
+
+ /// Sets the stack size (in bytes) for worker threads.
+ ///
+ /// The actual stack size may be greater than this value if the platform
+ /// specifies minimal stack size.
+ ///
+ /// The default stack size for spawned threads is 2 MiB, though this
+ /// particular stack size is subject to change in the future.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let rt = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .thread_stack_size(32 * 1024)
+ /// .build();
+ /// # }
+ /// ```
+ pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
+ self.thread_stack_size = Some(val);
+ self
+ }
+
+ /// Executes function `f` after each thread is started but before it starts
+ /// doing work.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .on_thread_start(|| {
+ /// println!("thread started");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.after_start = Some(Arc::new(f));
+ self
+ }
+
+ /// Executes function `f` before each thread stops.
+ ///
+ /// This is intended for bookkeeping and monitoring use cases.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::runtime;
+ ///
+ /// # pub fn main() {
+ /// let runtime = runtime::Builder::new()
+ /// .threaded_scheduler()
+ /// .on_thread_stop(|| {
+ /// println!("thread stopping");
+ /// })
+ /// .build();
+ /// # }
+ /// ```
+ #[cfg(not(loom))]
+ pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
+ where
+ F: Fn() + Send + Sync + 'static,
+ {
+ self.before_stop = Some(Arc::new(f));
+ self
+ }
+
+ /// Creates the configured `Runtime`.
+ ///
+ /// The returned `ThreadPool` instance is ready to spawn tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Builder;
+ ///
+ /// let mut rt = Builder::new().build().unwrap();
+ ///
+ /// rt.block_on(async {
+ /// println!("Hello from the Tokio runtime");
+ /// });
+ /// ```
+ pub fn build(&mut self) -> io::Result<Runtime> {
+ match self.kind {
+ Kind::Shell => self.build_shell_runtime(),
+ #[cfg(feature = "rt-core")]
+ Kind::Basic => self.build_basic_runtime(),
+ #[cfg(feature = "rt-threaded")]
+ Kind::ThreadPool => self.build_threaded_runtime(),
+ }
+ }
+
+ fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::Kind;
+
+ let clock = time::create_clock();
+
+ // Create I/O driver
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+
+ let spawner = Spawner::Shell;
+
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ Ok(Runtime {
+ kind: Kind::Shell(Shell::new(driver)),
+ handle: Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ },
+ blocking_pool,
+ })
+ }
+}
+
+cfg_io_driver! {
+ impl Builder {
+ /// Enables the I/O driver.
+ ///
+ /// Doing this enables using net, process, signal, and some I/O types on
+ /// the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_io()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_io(&mut self) -> &mut Self {
+ self.enable_io = true;
+ self
+ }
+ }
+}
+
+cfg_time! {
+ impl Builder {
+ /// Enables the time driver.
+ ///
+ /// Doing this enables using `tokio::time` on the runtime.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime;
+ ///
+ /// let rt = runtime::Builder::new()
+ /// .enable_time()
+ /// .build()
+ /// .unwrap();
+ /// ```
+ pub fn enable_time(&mut self) -> &mut Self {
+ self.enable_time = true;
+ self
+ }
+ }
+}
+
+cfg_rt_core! {
+ impl Builder {
+ /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread.
+ ///
+ /// The executor and all necessary drivers will all be run on the current
+ /// thread during `block_on` calls.
+ ///
+ /// See also [the module level documentation][1], which has a section on scheduler
+ /// types.
+ ///
+ /// [1]: index.html#runtime-configurations
+ pub fn basic_scheduler(&mut self) -> &mut Self {
+ self.kind = Kind::Basic;
+ self
+ }
+
+ fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::{BasicScheduler, Kind};
+
+ let clock = time::create_clock();
+
+ // Create I/O driver
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+
+ // And now put a single-threaded scheduler on top of the timer. When
+ // there are no futures ready to do something, it'll let the timer or
+ // the reactor to generate some new stimuli for the futures to continue
+ // in their life.
+ let scheduler = BasicScheduler::new(driver);
+ let spawner = Spawner::Basic(scheduler.spawner().clone());
+
+ // Blocking pool
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ Ok(Runtime {
+ kind: Kind::Basic(scheduler),
+ handle: Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ },
+ blocking_pool,
+ })
+ }
+ }
+}
+
+cfg_rt_threaded! {
+ impl Builder {
+ /// Sets runtime to use a multi-threaded scheduler for executing tasks.
+ ///
+ /// See also [the module level documentation][1], which has a section on scheduler
+ /// types.
+ ///
+ /// [1]: index.html#runtime-configurations
+ pub fn threaded_scheduler(&mut self) -> &mut Self {
+ self.kind = Kind::ThreadPool;
+ self
+ }
+
+ fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
+ use crate::runtime::{Kind, ThreadPool};
+ use crate::runtime::park::Parker;
+
+ let core_threads = self.core_threads.unwrap_or_else(crate::loom::sys::num_cpus);
+ assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
+
+ let clock = time::create_clock();
+
+ let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
+ let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
+ let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
+ let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
+
+ // Create the blocking pool
+ let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
+ let blocking_spawner = blocking_pool.spawner().clone();
+
+ // Create the runtime handle
+ let handle = Handle {
+ spawner,
+ io_handle,
+ time_handle,
+ clock,
+ blocking_spawner,
+ };
+
+ // Spawn the thread pool workers
+ handle.enter(|| launch.launch());
+
+ Ok(Runtime {
+ kind: Kind::ThreadPool(scheduler),
+ handle,
+ blocking_pool,
+ })
+ }
+ }
+}
+
+impl Default for Builder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl fmt::Debug for Builder {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Builder")
+ .field("kind", &self.kind)
+ .field("core_threads", &self.core_threads)
+ .field("max_threads", &self.max_threads)
+ .field("thread_name", &self.thread_name)
+ .field("thread_stack_size", &self.thread_stack_size)
+ .field("after_start", &self.after_start.as_ref().map(|_| "..."))
+ .field("before_stop", &self.after_start.as_ref().map(|_| "..."))
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/context.rs b/third_party/rust/tokio/src/runtime/context.rs
new file mode 100644
index 0000000000..4af2df23eb
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/context.rs
@@ -0,0 +1,73 @@
+//! Thread local runtime context
+use crate::runtime::Handle;
+
+use std::cell::RefCell;
+
+thread_local! {
+ static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None)
+}
+
+pub(crate) fn current() -> Option<Handle> {
+ CONTEXT.with(|ctx| ctx.borrow().clone())
+}
+
+cfg_io_driver! {
+ pub(crate) fn io_handle() -> crate::runtime::io::Handle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.io_handle.clone(),
+ None => Default::default(),
+ })
+ }
+}
+
+cfg_time! {
+ pub(crate) fn time_handle() -> crate::runtime::time::Handle {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => ctx.time_handle.clone(),
+ None => Default::default(),
+ })
+ }
+
+ cfg_test_util! {
+ pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => Some(ctx.clock.clone()),
+ None => None,
+ })
+ }
+ }
+}
+
+cfg_rt_core! {
+ pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
+ CONTEXT.with(|ctx| match *ctx.borrow() {
+ Some(ref ctx) => Some(ctx.spawner.clone()),
+ None => None,
+ })
+ }
+}
+
+/// Set this [`ThreadContext`] as the current active [`ThreadContext`].
+///
+/// [`ThreadContext`]: struct@ThreadContext
+pub(crate) fn enter<F, R>(new: Handle, f: F) -> R
+where
+ F: FnOnce() -> R,
+{
+ struct DropGuard(Option<Handle>);
+
+ impl Drop for DropGuard {
+ fn drop(&mut self) {
+ CONTEXT.with(|ctx| {
+ *ctx.borrow_mut() = self.0.take();
+ });
+ }
+ }
+
+ let _guard = CONTEXT.with(|ctx| {
+ let old = ctx.borrow_mut().replace(new);
+ DropGuard(old)
+ });
+
+ f()
+}
diff --git a/third_party/rust/tokio/src/runtime/enter.rs b/third_party/rust/tokio/src/runtime/enter.rs
new file mode 100644
index 0000000000..afdb67a3b7
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/enter.rs
@@ -0,0 +1,162 @@
+use std::cell::{Cell, RefCell};
+use std::fmt;
+use std::marker::PhantomData;
+
+thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
+
+/// Represents an executor context.
+pub(crate) struct Enter {
+ _p: PhantomData<RefCell<()>>,
+}
+
+/// Marks the current thread as being within the dynamic extent of an
+/// executor.
+pub(crate) fn enter() -> Enter {
+ if let Some(enter) = try_enter() {
+ return enter;
+ }
+
+ panic!(
+ "Cannot start a runtime from within a runtime. This happens \
+ because a function (like `block_on`) attempted to block the \
+ current thread while the thread is being used to drive \
+ asynchronous tasks."
+ );
+}
+
+/// Tries to enter a runtime context, returns `None` if already in a runtime
+/// context.
+pub(crate) fn try_enter() -> Option<Enter> {
+ ENTERED.with(|c| {
+ if c.get() {
+ None
+ } else {
+ c.set(true);
+ Some(Enter { _p: PhantomData })
+ }
+ })
+}
+
+// Forces the current "entered" state to be cleared while the closure
+// is executed.
+//
+// # Warning
+//
+// This is hidden for a reason. Do not use without fully understanding
+// executors. Misuing can easily cause your program to deadlock.
+#[cfg(all(feature = "rt-threaded", feature = "blocking"))]
+pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R {
+ // Reset in case the closure panics
+ struct Reset;
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ c.set(true);
+ });
+ }
+ }
+
+ ENTERED.with(|c| {
+ debug_assert!(c.get());
+ c.set(false);
+ });
+
+ let reset = Reset;
+ let ret = f();
+ std::mem::forget(reset);
+
+ ENTERED.with(|c| {
+ assert!(!c.get(), "closure claimed permanent executor");
+ c.set(true);
+ });
+
+ ret
+}
+
+cfg_blocking_impl! {
+ use crate::park::ParkError;
+ use std::time::Duration;
+
+ impl Enter {
+ /// Blocks the thread on the specified future, returning the value with
+ /// which that future completes.
+ pub(crate) fn block_on<F>(&mut self, mut f: F) -> Result<F::Output, ParkError>
+ where
+ F: std::future::Future,
+ {
+ use crate::park::{CachedParkThread, Park};
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll::Ready;
+
+ let mut park = CachedParkThread::new();
+ let waker = park.get_unpark()?.into_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
+ // no longer be accessed, making the pinning safe.
+ let mut f = unsafe { Pin::new_unchecked(&mut f) };
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return Ok(v);
+ }
+
+ park.park()?;
+ }
+ }
+
+ /// Blocks the thread on the specified future for **at most** `timeout`
+ ///
+ /// If the future completes before `timeout`, the result is returned. If
+ /// `timeout` elapses, then `Err` is returned.
+ pub(crate) fn block_on_timeout<F>(&mut self, mut f: F, timeout: Duration) -> Result<F::Output, ParkError>
+ where
+ F: std::future::Future,
+ {
+ use crate::park::{CachedParkThread, Park};
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll::Ready;
+ use std::time::Instant;
+
+ let mut park = CachedParkThread::new();
+ let waker = park.get_unpark()?.into_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can
+ // no longer be accessed, making the pinning safe.
+ let mut f = unsafe { Pin::new_unchecked(&mut f) };
+ let when = Instant::now() + timeout;
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return Ok(v);
+ }
+
+ let now = Instant::now();
+
+ if now >= when {
+ return Err(());
+ }
+
+ park.park_timeout(when - now)?;
+ }
+ }
+ }
+}
+
+impl fmt::Debug for Enter {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Enter").finish()
+ }
+}
+
+impl Drop for Enter {
+ fn drop(&mut self) {
+ ENTERED.with(|c| {
+ assert!(c.get());
+ c.set(false);
+ });
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/handle.rs b/third_party/rust/tokio/src/runtime/handle.rs
new file mode 100644
index 0000000000..db53543e85
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/handle.rs
@@ -0,0 +1,140 @@
+use crate::runtime::{blocking, context, io, time, Spawner};
+use std::{error, fmt};
+
+cfg_rt_core! {
+ use crate::task::JoinHandle;
+
+ use std::future::Future;
+}
+
+/// Handle to the runtime.
+///
+/// The handle is internally reference-counted and can be freely cloned. A handle can be
+/// obtained using the [`Runtime::handle`] method.
+///
+/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
+#[derive(Debug, Clone)]
+pub struct Handle {
+ pub(super) spawner: Spawner,
+
+ /// Handles to the I/O drivers
+ pub(super) io_handle: io::Handle,
+
+ /// Handles to the time drivers
+ pub(super) time_handle: time::Handle,
+
+ /// Source of `Instant::now()`
+ pub(super) clock: time::Clock,
+
+ /// Blocking pool spawner
+ pub(super) blocking_spawner: blocking::Spawner,
+}
+
+impl Handle {
+ /// Enter the runtime context.
+ pub fn enter<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ context::enter(self.clone(), f)
+ }
+
+ /// Returns a Handle view over the currently running Runtime
+ ///
+ /// # Panic
+ ///
+ /// This will panic if called outside the context of a Tokio runtime.
+ ///
+ /// # Examples
+ ///
+ /// This can be used to obtain the handle of the surrounding runtime from an async
+ /// block or function running on that runtime.
+ ///
+ /// ```
+ /// # use tokio::runtime::Runtime;
+ /// # fn dox() {
+ /// # let rt = Runtime::new().unwrap();
+ /// # rt.spawn(async {
+ /// use tokio::runtime::Handle;
+ ///
+ /// // Inside an async block or function.
+ /// let handle = Handle::current();
+ /// handle.spawn(async {
+ /// println!("now running in the existing Runtime");
+ /// })
+ /// # });
+ /// # }
+ /// ```
+ pub fn current() -> Self {
+ context::current().expect("not currently running on the Tokio runtime.")
+ }
+
+ /// Returns a Handle view over the currently running Runtime
+ ///
+ /// Returns an error if no Runtime has been started
+ ///
+ /// Contrary to `current`, this never panics
+ pub fn try_current() -> Result<Self, TryCurrentError> {
+ context::current().ok_or(TryCurrentError(()))
+ }
+}
+
+cfg_rt_core! {
+ impl Handle {
+ /// Spawns a future onto the Tokio runtime.
+ ///
+ /// This spawns the given future onto the runtime's executor, usually a
+ /// thread pool. The thread pool is then responsible for polling the future
+ /// until it completes.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// [mod]: index.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ /// let handle = rt.handle();
+ ///
+ /// // Spawn a future onto the runtime
+ /// handle.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Failure occurs if the executor
+ /// is currently at capacity and is unable to spawn a new future.
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.spawner.spawn(future)
+ }
+ }
+}
+
+/// Error returned by `try_current` when no Runtime has been started
+pub struct TryCurrentError(());
+
+impl fmt::Debug for TryCurrentError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("TryCurrentError").finish()
+ }
+}
+
+impl fmt::Display for TryCurrentError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("no tokio Runtime has been initialized")
+ }
+}
+
+impl error::Error for TryCurrentError {}
diff --git a/third_party/rust/tokio/src/runtime/io.rs b/third_party/rust/tokio/src/runtime/io.rs
new file mode 100644
index 0000000000..6a0953af85
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/io.rs
@@ -0,0 +1,63 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O
+//! driver. When the `time` feature flag is **not** enabled. These APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+/// Re-exported for convenience.
+pub(crate) use std::io::Result;
+
+pub(crate) use variant::*;
+
+#[cfg(feature = "io-driver")]
+mod variant {
+ use crate::io::driver;
+ use crate::park::{Either, ParkThread};
+
+ use std::io;
+
+ /// The driver value the runtime passes to the `timer` layer.
+ ///
+ /// When the `io-driver` feature is enabled, this is the "real" I/O driver
+ /// backed by Mio. Without the `io-driver` feature, this is a thread parker
+ /// backed by a condition variable.
+ pub(crate) type Driver = Either<driver::Driver, ParkThread>;
+
+ /// The handle the runtime stores for future use.
+ ///
+ /// When the `io-driver` feature is **not** enabled, this is `()`.
+ pub(crate) type Handle = Option<driver::Handle>;
+
+ pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
+ #[cfg(loom)]
+ assert!(!enable);
+
+ if enable {
+ let driver = driver::Driver::new()?;
+ let handle = driver.handle();
+
+ Ok((Either::A(driver), Some(handle)))
+ } else {
+ let driver = ParkThread::new();
+ Ok((Either::B(driver), None))
+ }
+ }
+}
+
+#[cfg(not(feature = "io-driver"))]
+mod variant {
+ use crate::park::ParkThread;
+
+ use std::io;
+
+ /// I/O is not enabled, use a condition variable based parker
+ pub(crate) type Driver = ParkThread;
+
+ /// There is no handle
+ pub(crate) type Handle = ();
+
+ pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> {
+ let driver = ParkThread::new();
+
+ Ok((driver, ()))
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/mod.rs b/third_party/rust/tokio/src/runtime/mod.rs
new file mode 100644
index 0000000000..36b2b442ee
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/mod.rs
@@ -0,0 +1,494 @@
+//! The Tokio runtime.
+//!
+//! Unlike other Rust programs, asynchronous applications require
+//! runtime support. In particular, the following runtime services are
+//! necessary:
+//!
+//! * An **I/O event loop**, called the driver, which drives I/O resources and
+//! dispatches I/O events to tasks that depend on them.
+//! * A **scheduler** to execute [tasks] that use these I/O resources.
+//! * A **timer** for scheduling work to run after a set period of time.
+//!
+//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
+//! them to be started, shut down, and configured together. However, most
+//! applications won't need to use [`Runtime`] directly. Instead, they can
+//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under
+//! the hood.
+//!
+//! # Usage
+//!
+//! Most applications will use the [`tokio::main`] attribute macro.
+//!
+//! ```no_run
+//! use tokio::net::TcpListener;
+//! use tokio::prelude::*;
+//!
+//! #[tokio::main]
+//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//!
+//! loop {
+//! let (mut socket, _) = listener.accept().await?;
+//!
+//! tokio::spawn(async move {
+//! let mut buf = [0; 1024];
+//!
+//! // In a loop, read data from the socket and write the data back.
+//! loop {
+//! let n = match socket.read(&mut buf).await {
+//! // socket closed
+//! Ok(n) if n == 0 => return,
+//! Ok(n) => n,
+//! Err(e) => {
+//! println!("failed to read from socket; err = {:?}", e);
+//! return;
+//! }
+//! };
+//!
+//! // Write the data back
+//! if let Err(e) = socket.write_all(&buf[0..n]).await {
+//! println!("failed to write to socket; err = {:?}", e);
+//! return;
+//! }
+//! }
+//! });
+//! }
+//! }
+//! ```
+//!
+//! From within the context of the runtime, additional tasks are spawned using
+//! the [`tokio::spawn`] function. Futures spawned using this function will be
+//! executed on the same thread pool used by the [`Runtime`].
+//!
+//! A [`Runtime`] instance can also be used directly.
+//!
+//! ```no_run
+//! use tokio::net::TcpListener;
+//! use tokio::prelude::*;
+//! use tokio::runtime::Runtime;
+//!
+//! fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! // Create the runtime
+//! let mut rt = Runtime::new()?;
+//!
+//! // Spawn the root task
+//! rt.block_on(async {
+//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?;
+//!
+//! loop {
+//! let (mut socket, _) = listener.accept().await?;
+//!
+//! tokio::spawn(async move {
+//! let mut buf = [0; 1024];
+//!
+//! // In a loop, read data from the socket and write the data back.
+//! loop {
+//! let n = match socket.read(&mut buf).await {
+//! // socket closed
+//! Ok(n) if n == 0 => return,
+//! Ok(n) => n,
+//! Err(e) => {
+//! println!("failed to read from socket; err = {:?}", e);
+//! return;
+//! }
+//! };
+//!
+//! // Write the data back
+//! if let Err(e) = socket.write_all(&buf[0..n]).await {
+//! println!("failed to write to socket; err = {:?}", e);
+//! return;
+//! }
+//! }
+//! });
+//! }
+//! })
+//! }
+//! ```
+//!
+//! ## Runtime Configurations
+//!
+//! Tokio provides multiple task scheduling strategies, suitable for different
+//! applications. The [runtime builder] or `#[tokio::main]` attribute may be
+//! used to select which scheduler to use.
+//!
+//! #### Basic Scheduler
+//!
+//! The basic scheduler provides a _single-threaded_ future executor. All tasks
+//! will be created and executed on the current thread. The basic scheduler
+//! requires the `rt-core` feature flag, and can be selected using the
+//! [`Builder::basic_scheduler`] method:
+//! ```
+//! use tokio::runtime;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let basic_rt = runtime::Builder::new()
+//! .basic_scheduler()
+//! .build()?;
+//! # Ok(()) }
+//! ```
+//!
+//! If the `rt-core` feature is enabled and `rt-threaded` is not,
+//! [`Runtime::new`] will return a basic scheduler runtime by default.
+//!
+//! #### Threaded Scheduler
+//!
+//! The threaded scheduler executes futures on a _thread pool_, using a
+//! work-stealing strategy. By default, it will start a worker thread for each
+//! CPU core available on the system. This tends to be the ideal configurations
+//! for most applications. The threaded scheduler requires the `rt-threaded` feature
+//! flag, and can be selected using the [`Builder::threaded_scheduler`] method:
+//! ```
+//! use tokio::runtime;
+//!
+//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
+//! let threaded_rt = runtime::Builder::new()
+//! .threaded_scheduler()
+//! .build()?;
+//! # Ok(()) }
+//! ```
+//!
+//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a
+//! threaded scheduler runtime by default.
+//!
+//! Most applications should use the threaded scheduler, except in some niche
+//! use-cases, such as when running only a single thread is required.
+//!
+//! #### Resource drivers
+//!
+//! When configuring a runtime by hand, no resource drivers are enabled by
+//! default. In this case, attempting to use networking types or time types will
+//! fail. In order to enable these types, the resource drivers must be enabled.
+//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
+//! shorthand, [`Builder::enable_all`] enables both resource drivers.
+//!
+//! ## Lifetime of spawned threads
+//!
+//! The runtime may spawn threads depending on its configuration and usage. The
+//! threaded scheduler spawns threads to schedule tasks and calls to
+//! `spawn_blocking` spawn threads to run blocking operations.
+//!
+//! While the `Runtime` is active, threads may shutdown after periods of being
+//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
+//! Any tasks that have not yet completed will be dropped.
+//!
+//! [tasks]: crate::task
+//! [`Runtime`]: Runtime
+//! [`tokio::spawn`]: crate::spawn
+//! [`tokio::main`]: ../attr.main.html
+//! [runtime builder]: crate::runtime::Builder
+//! [`Runtime::new`]: crate::runtime::Runtime::new
+//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
+//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
+//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
+//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
+//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all
+
+// At the top due to macros
+#[cfg(test)]
+#[macro_use]
+mod tests;
+
+pub(crate) mod context;
+
+cfg_rt_core! {
+ mod basic_scheduler;
+ use basic_scheduler::BasicScheduler;
+
+ pub(crate) mod task;
+}
+
+mod blocking;
+use blocking::BlockingPool;
+
+cfg_blocking_impl! {
+ #[allow(unused_imports)]
+ pub(crate) use blocking::{spawn_blocking, try_spawn_blocking};
+}
+
+mod builder;
+pub use self::builder::Builder;
+
+pub(crate) mod enter;
+use self::enter::enter;
+
+mod handle;
+pub use self::handle::{Handle, TryCurrentError};
+
+mod io;
+
+cfg_rt_threaded! {
+ mod park;
+ use park::Parker;
+}
+
+mod shell;
+use self::shell::Shell;
+
+mod spawner;
+use self::spawner::Spawner;
+
+mod time;
+
+cfg_rt_threaded! {
+ mod queue;
+
+ pub(crate) mod thread_pool;
+ use self::thread_pool::ThreadPool;
+}
+
+cfg_rt_core! {
+ use crate::task::JoinHandle;
+}
+
+use std::future::Future;
+use std::time::Duration;
+
+/// The Tokio runtime.
+///
+/// The runtime provides an I/O [driver], task scheduler, [timer], and blocking
+/// pool, necessary for running asynchronous tasks.
+///
+/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
+/// most users will use the `#[tokio::main]` annotation on their entry point instead.
+///
+/// See [module level][mod] documentation for more details.
+///
+/// # Shutdown
+///
+/// Shutting down the runtime is done by dropping the value. The current thread
+/// will block until the shut down operation has completed.
+///
+/// * Drain any scheduled work queues.
+/// * Drop any futures that have not yet completed.
+/// * Drop the reactor.
+///
+/// Once the reactor has dropped, any outstanding I/O resources bound to
+/// that reactor will no longer function. Calling any method on them will
+/// result in an error.
+///
+/// [driver]: crate::io::driver
+/// [timer]: crate::time
+/// [mod]: index.html
+/// [`new`]: #method.new
+/// [`Builder`]: struct@Builder
+/// [`tokio::run`]: fn@run
+#[derive(Debug)]
+pub struct Runtime {
+ /// Task executor
+ kind: Kind,
+
+ /// Handle to runtime, also contains driver handles
+ handle: Handle,
+
+ /// Blocking pool handle, used to signal shutdown
+ blocking_pool: BlockingPool,
+}
+
+/// The runtime executor is either a thread-pool or a current-thread executor.
+#[derive(Debug)]
+enum Kind {
+ /// Not able to execute concurrent tasks. This variant is mostly used to get
+ /// access to the driver handles.
+ Shell(Shell),
+
+ /// Execute all tasks on the current-thread.
+ #[cfg(feature = "rt-core")]
+ Basic(BasicScheduler<time::Driver>),
+
+ /// Execute tasks across multiple threads.
+ #[cfg(feature = "rt-threaded")]
+ ThreadPool(ThreadPool),
+}
+
+/// After thread starts / before thread stops
+type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
+
+impl Runtime {
+ /// Create a new runtime instance with default configuration values.
+ ///
+ /// This results in a scheduler, I/O driver, and time driver being
+ /// initialized. The type of scheduler used depends on what feature flags
+ /// are enabled: if the `rt-threaded` feature is enabled, the [threaded
+ /// scheduler] is used, while if only the `rt-core` feature is enabled, the
+ /// [basic scheduler] is used instead.
+ ///
+ /// If the threaded scheduler is selected, it will not spawn
+ /// any worker threads until it needs to, i.e. tasks are scheduled to run.
+ ///
+ /// Most applications will not need to call this function directly. Instead,
+ /// they will use the [`#[tokio::main]` attribute][main]. When more complex
+ /// configuration is necessary, the [runtime builder] may be used.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// # Examples
+ ///
+ /// Creating a new `Runtime` with default configuration values.
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// // Use the runtime...
+ /// ```
+ ///
+ /// [mod]: index.html
+ /// [main]: ../../tokio_macros/attr.main.html
+ /// [threaded scheduler]: index.html#threaded-scheduler
+ /// [basic scheduler]: index.html#basic-scheduler
+ /// [runtime builder]: crate::runtime::Builder
+ pub fn new() -> io::Result<Runtime> {
+ #[cfg(feature = "rt-threaded")]
+ let ret = Builder::new().threaded_scheduler().enable_all().build();
+
+ #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))]
+ let ret = Builder::new().basic_scheduler().enable_all().build();
+
+ #[cfg(not(feature = "rt-core"))]
+ let ret = Builder::new().enable_all().build();
+
+ ret
+ }
+
+ /// Spawn a future onto the Tokio runtime.
+ ///
+ /// This spawns the given future onto the runtime's executor, usually a
+ /// thread pool. The thread pool is then responsible for polling the future
+ /// until it completes.
+ ///
+ /// See [module level][mod] documentation for more details.
+ ///
+ /// [mod]: index.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// # fn dox() {
+ /// // Create the runtime
+ /// let rt = Runtime::new().unwrap();
+ ///
+ /// // Spawn a future onto the runtime
+ /// rt.spawn(async {
+ /// println!("now running on a worker thread");
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the spawn fails. Failure occurs if the executor
+ /// is currently at capacity and is unable to spawn a new future.
+ #[cfg(feature = "rt-core")]
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match &self.kind {
+ Kind::Shell(_) => panic!("task execution disabled"),
+ #[cfg(feature = "rt-threaded")]
+ Kind::ThreadPool(exec) => exec.spawn(future),
+ Kind::Basic(exec) => exec.spawn(future),
+ }
+ }
+
+ /// Run a future to completion on the Tokio runtime. This is the runtime's
+ /// entry point.
+ ///
+ /// This runs the given future on the runtime, blocking until it is
+ /// complete, and yielding its resolved result. Any tasks or timers which
+ /// the future spawns internally will be executed on the runtime.
+ ///
+ /// This method should not be called from an asynchronous context.
+ ///
+ /// # Panics
+ ///
+ /// This function panics if the executor is at capacity, if the provided
+ /// future panics, or if called within an asynchronous execution context.
+ pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
+ let kind = &mut self.kind;
+
+ self.handle.enter(|| match kind {
+ Kind::Shell(exec) => exec.block_on(future),
+ #[cfg(feature = "rt-core")]
+ Kind::Basic(exec) => exec.block_on(future),
+ #[cfg(feature = "rt-threaded")]
+ Kind::ThreadPool(exec) => exec.block_on(future),
+ })
+ }
+
+ /// Enter the runtime context.
+ pub fn enter<F, R>(&self, f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ self.handle.enter(f)
+ }
+
+ /// Return a handle to the runtime's spawner.
+ ///
+ /// The returned handle can be used to spawn tasks that run on this runtime, and can
+ /// be cloned to allow moving the `Handle` to other threads.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ ///
+ /// let rt = Runtime::new()
+ /// .unwrap();
+ ///
+ /// let handle = rt.handle();
+ ///
+ /// handle.spawn(async { println!("hello"); });
+ /// ```
+ pub fn handle(&self) -> &Handle {
+ &self.handle
+ }
+
+ /// Shutdown the runtime, waiting for at most `duration` for all spawned
+ /// task to shutdown.
+ ///
+ /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
+ /// shutdown in a timely fashion. However, dropping a `Runtime` will wait
+ /// indefinitely for all tasks to terminate, and there are cases where a long
+ /// blocking task has been spawned which can block dropping `Runtime`.
+ ///
+ /// In this case, calling `shutdown_timeout` with an explicit wait timeout
+ /// can work. The `shutdown_timeout` will signal all tasks to shutdown and
+ /// will wait for at most `duration` for all spawned tasks to terminate. If
+ /// `timeout` elapses before all tasks are dropped, the function returns and
+ /// outstanding tasks are potentially leaked.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio::runtime::Runtime;
+ /// use tokio::task;
+ ///
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// fn main() {
+ /// let mut runtime = Runtime::new().unwrap();
+ ///
+ /// runtime.block_on(async move {
+ /// task::spawn_blocking(move || {
+ /// thread::sleep(Duration::from_secs(10_000));
+ /// });
+ /// });
+ ///
+ /// runtime.shutdown_timeout(Duration::from_millis(100));
+ /// }
+ /// ```
+ pub fn shutdown_timeout(self, duration: Duration) {
+ let Runtime {
+ mut blocking_pool, ..
+ } = self;
+ blocking_pool.shutdown(Some(duration));
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/park.rs b/third_party/rust/tokio/src/runtime/park.rs
new file mode 100644
index 0000000000..ee437d1d94
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/park.rs
@@ -0,0 +1,245 @@
+//! Parks the runtime.
+//!
+//! A combination of the various resource driver park handles.
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::{Arc, Condvar, Mutex};
+use crate::loom::thread;
+use crate::park::{Park, Unpark};
+use crate::runtime::time;
+use crate::util::TryLock;
+
+use std::sync::atomic::Ordering::SeqCst;
+use std::time::Duration;
+
+pub(crate) struct Parker {
+ inner: Arc<Inner>,
+}
+
+pub(crate) struct Unparker {
+ inner: Arc<Inner>,
+}
+
+struct Inner {
+ /// Avoids entering the park if possible
+ state: AtomicUsize,
+
+ /// Used to coordinate access to the driver / condvar
+ mutex: Mutex<()>,
+
+ /// Condvar to block on if the driver is unavailable.
+ condvar: Condvar,
+
+ /// Resource (I/O, time, ...) driver
+ shared: Arc<Shared>,
+}
+
+const EMPTY: usize = 0;
+const PARKED_CONDVAR: usize = 1;
+const PARKED_DRIVER: usize = 2;
+const NOTIFIED: usize = 3;
+
+/// Shared across multiple Parker handles
+struct Shared {
+ /// Shared driver. Only one thread at a time can use this
+ driver: TryLock<time::Driver>,
+
+ /// Unpark handle
+ handle: <time::Driver as Park>::Unpark,
+}
+
+impl Parker {
+ pub(crate) fn new(driver: time::Driver) -> Parker {
+ let handle = driver.unpark();
+
+ Parker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ shared: Arc::new(Shared {
+ driver: TryLock::new(driver),
+ handle,
+ }),
+ }),
+ }
+ }
+}
+
+impl Clone for Parker {
+ fn clone(&self) -> Parker {
+ Parker {
+ inner: Arc::new(Inner {
+ state: AtomicUsize::new(EMPTY),
+ mutex: Mutex::new(()),
+ condvar: Condvar::new(),
+ shared: self.inner.shared.clone(),
+ }),
+ }
+ }
+}
+
+impl Park for Parker {
+ type Unpark = Unparker;
+ type Error = ();
+
+ fn unpark(&self) -> Unparker {
+ Unparker {
+ inner: self.inner.clone(),
+ }
+ }
+
+ fn park(&mut self) -> Result<(), Self::Error> {
+ self.inner.park();
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
+ // Only parking with zero is supported...
+ assert_eq!(duration, Duration::from_millis(0));
+
+ if let Some(mut driver) = self.inner.shared.driver.try_lock() {
+ driver.park_timeout(duration).map_err(|_| ())
+ } else {
+ Ok(())
+ }
+ }
+}
+
+impl Unpark for Unparker {
+ fn unpark(&self) {
+ self.inner.unpark();
+ }
+}
+
+impl Inner {
+ /// Parks the current thread for at most `dur`.
+ fn park(&self) {
+ for _ in 0..3 {
+ // If we were previously notified then we consume this notification and
+ // return quickly.
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ return;
+ }
+
+ thread::yield_now();
+ }
+
+ if let Some(mut driver) = self.shared.driver.try_lock() {
+ self.park_driver(&mut driver);
+ } else {
+ self.park_condvar();
+ }
+ }
+
+ fn park_condvar(&self) {
+ // Otherwise we need to coordinate going to sleep
+ let mut m = self.mutex.lock().unwrap();
+
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
+ {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+
+ return;
+ }
+ Err(actual) => panic!("inconsistent park state; actual = {}", actual),
+ }
+
+ loop {
+ m = self.condvar.wait(m).unwrap();
+
+ if self
+ .state
+ .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
+ .is_ok()
+ {
+ // got a notification
+ return;
+ }
+
+ // spurious wakeup, go back to sleep
+ }
+ }
+
+ fn park_driver(&self, driver: &mut time::Driver) {
+ match self
+ .state
+ .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
+ {
+ Ok(_) => {}
+ Err(NOTIFIED) => {
+ // We must read here, even though we know it will be `NOTIFIED`.
+ // This is because `unpark` may have been called again since we read
+ // `NOTIFIED` in the `compare_exchange` above. We must perform an
+ // acquire operation that synchronizes with that `unpark` to observe
+ // any writes it made before the call to unpark. To do that we must
+ // read from the write it made to `state`.
+ let old = self.state.swap(EMPTY, SeqCst);
+ debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
+
+ return;
+ }
+ Err(actual) => panic!("inconsistent park state; actual = {}", actual),
+ }
+
+ // TODO: don't unwrap
+ driver.park().unwrap();
+
+ match self.state.swap(EMPTY, SeqCst) {
+ NOTIFIED => {} // got a notification, hurray!
+ PARKED_DRIVER => {} // no notification, alas
+ n => panic!("inconsistent park_timeout state: {}", n),
+ }
+ }
+
+ fn unpark(&self) {
+ // To ensure the unparked thread will observe any writes we made before
+ // this call, we must perform a release operation that `park` can
+ // synchronize with. To do that we must write `NOTIFIED` even if `state`
+ // is already `NOTIFIED`. That is why this must be a swap rather than a
+ // compare-and-swap that returns if it reads `NOTIFIED` on failure.
+ match self.state.swap(NOTIFIED, SeqCst) {
+ EMPTY => {} // no one was waiting
+ NOTIFIED => {} // already unparked
+ PARKED_CONDVAR => self.unpark_condvar(),
+ PARKED_DRIVER => self.unpark_driver(),
+ actual => panic!("inconsistent state in unpark; actual = {}", actual),
+ }
+ }
+
+ fn unpark_condvar(&self) {
+ // There is a period between when the parked thread sets `state` to
+ // `PARKED` (or last checked `state` in the case of a spurious wake
+ // up) and when it actually waits on `cvar`. If we were to notify
+ // during this period it would be ignored and then when the parked
+ // thread went to sleep it would never wake up. Fortunately, it has
+ // `lock` locked at this stage so we can acquire `lock` to wait until
+ // it is ready to receive the notification.
+ //
+ // Releasing `lock` before the call to `notify_one` means that when the
+ // parked thread wakes it doesn't get woken only to have to wait for us
+ // to release `lock`.
+ drop(self.mutex.lock().unwrap());
+
+ self.condvar.notify_one()
+ }
+
+ fn unpark_driver(&self) {
+ self.shared.handle.unpark();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/queue.rs b/third_party/rust/tokio/src/runtime/queue.rs
new file mode 100644
index 0000000000..c654514bbc
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/queue.rs
@@ -0,0 +1,630 @@
+//! Run-queue structures to support a work-stealing scheduler
+
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize};
+use crate::loom::sync::{Arc, Mutex};
+use crate::runtime::task;
+
+use std::marker::PhantomData;
+use std::mem::MaybeUninit;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+
+/// Producer handle. May only be used from a single thread.
+pub(super) struct Local<T: 'static> {
+ inner: Arc<Inner<T>>,
+}
+
+/// Consumer handle. May be used from many threads.
+pub(super) struct Steal<T: 'static>(Arc<Inner<T>>);
+
+/// Growable, MPMC queue used to inject new tasks into the scheduler and as an
+/// overflow queue when the local, fixed-size, array queue overflows.
+pub(super) struct Inject<T: 'static> {
+ /// Pointers to the head and tail of the queue
+ pointers: Mutex<Pointers>,
+
+ /// Number of pending tasks in the queue. This helps prevent unnecessary
+ /// locking in the hot path.
+ len: AtomicUsize,
+
+ _p: PhantomData<T>,
+}
+
+pub(super) struct Inner<T: 'static> {
+ /// Concurrently updated by many threads.
+ ///
+ /// Contains two `u16` values. The LSB byte is the "real" head of the queue.
+ /// The `u16` in the MSB is set by a stealer in process of stealing values.
+ /// It represents the first value being stolen in the batch. `u16` is used
+ /// in order to distinguish between `head == tail` and `head == tail -
+ /// capacity`.
+ ///
+ /// When both `u16` values are the same, there is no active stealer.
+ ///
+ /// Tracking an in-progress stealer prevents a wrapping scenario.
+ head: AtomicU32,
+
+ /// Only updated by producer thread but read by many threads.
+ tail: AtomicU16,
+
+ /// Elements
+ buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
+}
+
+struct Pointers {
+ /// True if the queue is closed
+ is_closed: bool,
+
+ /// Linked-list head
+ head: Option<NonNull<task::Header>>,
+
+ /// Linked-list tail
+ tail: Option<NonNull<task::Header>>,
+}
+
+unsafe impl<T> Send for Inner<T> {}
+unsafe impl<T> Sync for Inner<T> {}
+unsafe impl<T> Send for Inject<T> {}
+unsafe impl<T> Sync for Inject<T> {}
+
+#[cfg(not(loom))]
+const LOCAL_QUEUE_CAPACITY: usize = 256;
+
+// Shrink the size of the local queue when using loom. This shouldn't impact
+// logic, but allows loom to test more edge cases in a reasonable a mount of
+// time.
+#[cfg(loom)]
+const LOCAL_QUEUE_CAPACITY: usize = 4;
+
+const MASK: usize = LOCAL_QUEUE_CAPACITY - 1;
+
+/// Create a new local run-queue
+pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
+ let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY);
+
+ for _ in 0..LOCAL_QUEUE_CAPACITY {
+ buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
+ }
+
+ let inner = Arc::new(Inner {
+ head: AtomicU32::new(0),
+ tail: AtomicU16::new(0),
+ buffer: buffer.into(),
+ });
+
+ let local = Local {
+ inner: inner.clone(),
+ };
+
+ let remote = Steal(inner);
+
+ (remote, local)
+}
+
+impl<T> Local<T> {
+ /// Returns true if the queue has entries that can be stealed.
+ pub(super) fn is_stealable(&self) -> bool {
+ !self.inner.is_empty()
+ }
+
+ /// Pushes a task to the back of the local queue, skipping the LIFO slot.
+ pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) {
+ let tail = loop {
+ let head = self.inner.head.load(Acquire);
+ let (steal, real) = unpack(head);
+
+ // safety: this is the **only** thread that updates this cell.
+ let tail = unsafe { self.inner.tail.unsync_load() };
+
+ if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as u16 {
+ // There is capacity for the task
+ break tail;
+ } else if steal != real {
+ // Concurrently stealing, this will free up capacity, so
+ // only push the new task onto the inject queue
+ inject.push(task);
+ return;
+ } else {
+ // Push the current task and half of the queue into the
+ // inject queue.
+ match self.push_overflow(task, real, tail, inject) {
+ Ok(_) => return,
+ // Lost the race, try again
+ Err(v) => {
+ task = v;
+ }
+ }
+ }
+ };
+
+ // Map the position to a slot index.
+ let idx = tail as usize & MASK;
+
+ self.inner.buffer[idx].with_mut(|ptr| {
+ // Write the task to the slot
+ //
+ // Safety: There is only one producer and the above `if`
+ // condition ensures we don't touch a cell if there is a
+ // value, thus no consumer.
+ unsafe {
+ ptr::write((*ptr).as_mut_ptr(), task);
+ }
+ });
+
+ // Make the task available. Synchronizes with a load in
+ // `steal_into2`.
+ self.inner.tail.store(tail.wrapping_add(1), Release);
+ }
+
+ /// Moves a batch of tasks into the inject queue.
+ ///
+ /// This will temporarily make some of the tasks unavailable to stealers.
+ /// Once `push_overflow` is done, a notification is sent out, so if other
+ /// workers "missed" some of the tasks during a steal, they will get
+ /// another opportunity.
+ #[inline(never)]
+ fn push_overflow(
+ &mut self,
+ task: task::Notified<T>,
+ head: u16,
+ tail: u16,
+ inject: &Inject<T>,
+ ) -> Result<(), task::Notified<T>> {
+ const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1;
+
+ let n = (LOCAL_QUEUE_CAPACITY / 2) as u16;
+ assert_eq!(
+ tail.wrapping_sub(head) as usize,
+ LOCAL_QUEUE_CAPACITY,
+ "queue is not full; tail = {}; head = {}",
+ tail,
+ head
+ );
+
+ let prev = pack(head, head);
+
+ // Claim a bunch of tasks
+ //
+ // We are claiming the tasks **before** reading them out of the buffer.
+ // This is safe because only the **current** thread is able to push new
+ // tasks.
+ //
+ // There isn't really any need for memory ordering... Relaxed would
+ // work. This is because all tasks are pushed into the queue from the
+ // current thread (or memory has been acquired if the local queue handle
+ // moved).
+ let actual = self.inner.head.compare_and_swap(
+ prev,
+ pack(head.wrapping_add(n), head.wrapping_add(n)),
+ Release,
+ );
+
+ if actual != prev {
+ // We failed to claim the tasks, losing the race. Return out of
+ // this function and try the full `push` routine again. The queue
+ // may not be full anymore.
+ return Err(task);
+ }
+
+ // link the tasks
+ for i in 0..n {
+ let j = i + 1;
+
+ let i_idx = i.wrapping_add(head) as usize & MASK;
+ let j_idx = j.wrapping_add(head) as usize & MASK;
+
+ // Get the next pointer
+ let next = if j == n {
+ // The last task in the local queue being moved
+ task.header().into()
+ } else {
+ // safety: The above CAS prevents a stealer from accessing these
+ // tasks and we are the only producer.
+ self.inner.buffer[j_idx].with(|ptr| unsafe {
+ let value = (*ptr).as_ptr();
+ (*value).header().into()
+ })
+ };
+
+ // safety: the above CAS prevents a stealer from accessing these
+ // tasks and we are the only producer.
+ self.inner.buffer[i_idx].with_mut(|ptr| unsafe {
+ let ptr = (*ptr).as_ptr();
+ (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next));
+ });
+ }
+
+ // safety: the above CAS prevents a stealer from accessing these tasks
+ // and we are the only producer.
+ let head = self.inner.buffer[head as usize & MASK]
+ .with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ // Push the tasks onto the inject queue
+ inject.push_batch(head, task, BATCH_LEN);
+
+ Ok(())
+ }
+
+ /// Pops a task from the local queue.
+ pub(super) fn pop(&mut self) -> Option<task::Notified<T>> {
+ let mut head = self.inner.head.load(Acquire);
+
+ let idx = loop {
+ let (steal, real) = unpack(head);
+
+ // safety: this is the **only** thread that updates this cell.
+ let tail = unsafe { self.inner.tail.unsync_load() };
+
+ if real == tail {
+ // queue is empty
+ return None;
+ }
+
+ let next_real = real.wrapping_add(1);
+
+ // If `steal == real` there are no concurrent stealers. Both `steal`
+ // and `real` are updated.
+ let next = if steal == real {
+ pack(next_real, next_real)
+ } else {
+ assert_ne!(steal, next_real);
+ pack(steal, next_real)
+ };
+
+ // Attempt to claim a task.
+ let res = self
+ .inner
+ .head
+ .compare_exchange(head, next, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => break real as usize & MASK,
+ Err(actual) => head = actual,
+ }
+ };
+
+ Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
+ }
+}
+
+impl<T> Steal<T> {
+ pub(super) fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+
+ /// Steals half the tasks from self and place them into `dst`.
+ pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> {
+ // Safety: the caller is the only thread that mutates `dst.tail` and
+ // holds a mutable reference.
+ let dst_tail = unsafe { dst.inner.tail.unsync_load() };
+
+ // To the caller, `dst` may **look** empty but still have values
+ // contained in the buffer. If another thread is concurrently stealing
+ // from `dst` there may not be enough capacity to steal.
+ let (steal, _) = unpack(dst.inner.head.load(Acquire));
+
+ if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as u16 / 2 {
+ // we *could* try to steal less here, but for simplicity, we're just
+ // going to abort.
+ return None;
+ }
+
+ // Steal the tasks into `dst`'s buffer. This does not yet expose the
+ // tasks in `dst`.
+ let mut n = self.steal_into2(dst, dst_tail);
+
+ if n == 0 {
+ // No tasks were stolen
+ return None;
+ }
+
+ // We are returning a task here
+ n -= 1;
+
+ let ret_pos = dst_tail.wrapping_add(n);
+ let ret_idx = ret_pos as usize & MASK;
+
+ // safety: the value was written as part of `steal_into2` and not
+ // exposed to stealers, so no other thread can access it.
+ let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ if n == 0 {
+ // The `dst` queue is empty, but a single task was stolen
+ return Some(ret);
+ }
+
+ // Make the stolen items available to consumers
+ dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
+
+ Some(ret)
+ }
+
+ // Steal tasks from `self`, placing them into `dst`. Returns the number of
+ // tasks that were stolen.
+ fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u16) -> u16 {
+ let mut prev_packed = self.0.head.load(Acquire);
+ let mut next_packed;
+
+ let n = loop {
+ let (src_head_steal, src_head_real) = unpack(prev_packed);
+ let src_tail = self.0.tail.load(Acquire);
+
+ // If these two do not match, another thread is concurrently
+ // stealing from the queue.
+ if src_head_steal != src_head_real {
+ return 0;
+ }
+
+ // Number of available tasks to steal
+ let n = src_tail.wrapping_sub(src_head_real);
+ let n = n - n / 2;
+
+ if n == 0 {
+ // No tasks available to steal
+ return 0;
+ }
+
+ // Update the real head index to acquire the tasks.
+ let steal_to = src_head_real.wrapping_add(n);
+ assert_ne!(src_head_steal, steal_to);
+ next_packed = pack(src_head_steal, steal_to);
+
+ // Claim all those tasks. This is done by incrementing the "real"
+ // head but not the steal. By doing this, no other thread is able to
+ // steal from this queue until the current thread completes.
+ let res = self
+ .0
+ .head
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => break n,
+ Err(actual) => prev_packed = actual,
+ }
+ };
+
+ assert!(n <= LOCAL_QUEUE_CAPACITY as u16 / 2, "actual = {}", n);
+
+ let (first, _) = unpack(next_packed);
+
+ // Take all the tasks
+ for i in 0..n {
+ // Compute the positions
+ let src_pos = first.wrapping_add(i);
+ let dst_pos = dst_tail.wrapping_add(i);
+
+ // Map to slots
+ let src_idx = src_pos as usize & MASK;
+ let dst_idx = dst_pos as usize & MASK;
+
+ // Read the task
+ //
+ // safety: We acquired the task with the atomic exchange above.
+ let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
+
+ // Write the task to the new slot
+ //
+ // safety: `dst` queue is empty and we are the only producer to
+ // this queue.
+ dst.inner.buffer[dst_idx]
+ .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
+ }
+
+ let mut prev_packed = next_packed;
+
+ // Update `src_head_steal` to match `src_head_real` signalling that the
+ // stealing routine is complete.
+ loop {
+ let head = unpack(prev_packed).1;
+ next_packed = pack(head, head);
+
+ let res = self
+ .0
+ .head
+ .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return n,
+ Err(actual) => {
+ let (actual_steal, actual_real) = unpack(actual);
+
+ assert_ne!(actual_steal, actual_real);
+
+ prev_packed = actual;
+ }
+ }
+ }
+ }
+}
+
+impl<T> Clone for Steal<T> {
+ fn clone(&self) -> Steal<T> {
+ Steal(self.0.clone())
+ }
+}
+
+impl<T> Drop for Local<T> {
+ fn drop(&mut self) {
+ if !std::thread::panicking() {
+ assert!(self.pop().is_none(), "queue not empty");
+ }
+ }
+}
+
+impl<T> Inner<T> {
+ fn is_empty(&self) -> bool {
+ let (_, head) = unpack(self.head.load(Acquire));
+ let tail = self.tail.load(Acquire);
+
+ head == tail
+ }
+}
+
+impl<T: 'static> Inject<T> {
+ pub(super) fn new() -> Inject<T> {
+ Inject {
+ pointers: Mutex::new(Pointers {
+ is_closed: false,
+ head: None,
+ tail: None,
+ }),
+ len: AtomicUsize::new(0),
+ _p: PhantomData,
+ }
+ }
+
+ pub(super) fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
+
+ /// Close the injection queue, returns `true` if the queue is open when the
+ /// transition is made.
+ pub(super) fn close(&self) -> bool {
+ let mut p = self.pointers.lock().unwrap();
+
+ if p.is_closed {
+ return false;
+ }
+
+ p.is_closed = true;
+ true
+ }
+
+ pub(super) fn is_closed(&self) -> bool {
+ self.pointers.lock().unwrap().is_closed
+ }
+
+ pub(super) fn len(&self) -> usize {
+ self.len.load(Acquire)
+ }
+
+ /// Pushes a value into the queue.
+ pub(super) fn push(&self, task: task::Notified<T>) {
+ // Acquire queue lock
+ let mut p = self.pointers.lock().unwrap();
+
+ if p.is_closed {
+ // Drop the mutex to avoid a potential deadlock when
+ // re-entering.
+ drop(p);
+ drop(task);
+ return;
+ }
+
+ // safety: only mutated with the lock held
+ let len = unsafe { self.len.unsync_load() };
+ let task = task.into_raw();
+
+ // The next pointer should already be null
+ debug_assert!(get_next(task).is_none());
+
+ if let Some(tail) = p.tail {
+ set_next(tail, Some(task));
+ } else {
+ p.head = Some(task);
+ }
+
+ p.tail = Some(task);
+
+ self.len.store(len + 1, Release);
+ }
+
+ pub(super) fn push_batch(
+ &self,
+ batch_head: task::Notified<T>,
+ batch_tail: task::Notified<T>,
+ num: usize,
+ ) {
+ let batch_head = batch_head.into_raw();
+ let batch_tail = batch_tail.into_raw();
+
+ debug_assert!(get_next(batch_tail).is_none());
+
+ let mut p = self.pointers.lock().unwrap();
+
+ if let Some(tail) = p.tail {
+ set_next(tail, Some(batch_head));
+ } else {
+ p.head = Some(batch_head);
+ }
+
+ p.tail = Some(batch_tail);
+
+ // Increment the count.
+ //
+ // safety: All updates to the len atomic are guarded by the mutex. As
+ // such, a non-atomic load followed by a store is safe.
+ let len = unsafe { self.len.unsync_load() };
+
+ self.len.store(len + num, Release);
+ }
+
+ pub(super) fn pop(&self) -> Option<task::Notified<T>> {
+ // Fast path, if len == 0, then there are no values
+ if self.is_empty() {
+ return None;
+ }
+
+ let mut p = self.pointers.lock().unwrap();
+
+ // It is possible to hit null here if another thread poped the last
+ // task between us checking `len` and acquiring the lock.
+ let task = p.head?;
+
+ p.head = get_next(task);
+
+ if p.head.is_none() {
+ p.tail = None;
+ }
+
+ set_next(task, None);
+
+ // Decrement the count.
+ //
+ // safety: All updates to the len atomic are guarded by the mutex. As
+ // such, a non-atomic load followed by a store is safe.
+ self.len
+ .store(unsafe { self.len.unsync_load() } - 1, Release);
+
+ // safety: a `Notified` is pushed into the queue and now it is popped!
+ Some(unsafe { task::Notified::from_raw(task) })
+ }
+}
+
+impl<T: 'static> Drop for Inject<T> {
+ fn drop(&mut self) {
+ if !std::thread::panicking() {
+ assert!(self.pop().is_none(), "queue not empty");
+ }
+ }
+}
+
+fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
+ unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
+}
+
+fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
+ unsafe {
+ header.as_ref().queue_next.with_mut(|ptr| *ptr = val);
+ }
+}
+
+/// Split the head value into the real head and the index a stealer is working
+/// on.
+fn unpack(n: u32) -> (u16, u16) {
+ let real = n & u16::max_value() as u32;
+ let steal = n >> 16;
+
+ (steal as u16, real as u16)
+}
+
+/// Join the two head values
+fn pack(steal: u16, real: u16) -> u32 {
+ (real as u32) | ((steal as u32) << 16)
+}
+
+#[test]
+fn test_local_queue_capacity() {
+ assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize);
+}
diff --git a/third_party/rust/tokio/src/runtime/shell.rs b/third_party/rust/tokio/src/runtime/shell.rs
new file mode 100644
index 0000000000..294f2a16d8
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/shell.rs
@@ -0,0 +1,62 @@
+#![allow(clippy::redundant_clone)]
+
+use crate::park::{Park, Unpark};
+use crate::runtime::enter;
+use crate::runtime::time;
+use crate::util::{waker_ref, Wake};
+
+use std::future::Future;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll::Ready;
+
+#[derive(Debug)]
+pub(super) struct Shell {
+ driver: time::Driver,
+
+ /// TODO: don't store this
+ unpark: Arc<Handle>,
+}
+
+#[derive(Debug)]
+struct Handle(<time::Driver as Park>::Unpark);
+
+impl Shell {
+ pub(super) fn new(driver: time::Driver) -> Shell {
+ let unpark = Arc::new(Handle(driver.unpark()));
+
+ Shell { driver, unpark }
+ }
+
+ pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
+ where
+ F: Future,
+ {
+ let _e = enter();
+
+ pin!(f);
+
+ let waker = waker_ref(&self.unpark);
+ let mut cx = Context::from_waker(&waker);
+
+ loop {
+ if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
+ return v;
+ }
+
+ self.driver.park().unwrap();
+ }
+ }
+}
+
+impl Wake for Handle {
+ /// Wake by value
+ fn wake(self: Arc<Self>) {
+ Wake::wake_by_ref(&self);
+ }
+
+ /// Wake by reference
+ fn wake_by_ref(arc_self: &Arc<Self>) {
+ arc_self.0.unpark();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/spawner.rs b/third_party/rust/tokio/src/runtime/spawner.rs
new file mode 100644
index 0000000000..d136945cdc
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/spawner.rs
@@ -0,0 +1,37 @@
+cfg_rt_core! {
+ use crate::runtime::basic_scheduler;
+ use crate::task::JoinHandle;
+
+ use std::future::Future;
+}
+
+cfg_rt_threaded! {
+ use crate::runtime::thread_pool;
+}
+
+#[derive(Debug, Clone)]
+pub(crate) enum Spawner {
+ Shell,
+ #[cfg(feature = "rt-core")]
+ Basic(basic_scheduler::Spawner),
+ #[cfg(feature = "rt-threaded")]
+ ThreadPool(thread_pool::Spawner),
+}
+
+cfg_rt_core! {
+ impl Spawner {
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match self {
+ Spawner::Shell => panic!("spawning not enabled for runtime"),
+ #[cfg(feature = "rt-core")]
+ Spawner::Basic(spawner) => spawner.spawn(future),
+ #[cfg(feature = "rt-threaded")]
+ Spawner::ThreadPool(spawner) => spawner.spawn(future),
+ }
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/core.rs b/third_party/rust/tokio/src/runtime/task/core.rs
new file mode 100644
index 0000000000..573b9f3c9c
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/core.rs
@@ -0,0 +1,279 @@
+use crate::loom::cell::UnsafeCell;
+use crate::runtime::task::raw::{self, Vtable};
+use crate::runtime::task::state::State;
+use crate::runtime::task::waker::waker_ref;
+use crate::runtime::task::{Notified, Schedule, Task};
+use crate::util::linked_list;
+
+use std::future::Future;
+use std::pin::Pin;
+use std::ptr::NonNull;
+use std::task::{Context, Poll, Waker};
+
+/// The task cell. Contains the components of the task.
+///
+/// It is critical for `Header` to be the first field as the task structure will
+/// be referenced by both *mut Cell and *mut Header.
+#[repr(C)]
+pub(super) struct Cell<T: Future, S> {
+ /// Hot task state data
+ pub(super) header: Header,
+
+ /// Either the future or output, depending on the execution stage.
+ pub(super) core: Core<T, S>,
+
+ /// Cold data
+ pub(super) trailer: Trailer,
+}
+
+/// The core of the task.
+///
+/// Holds the future or output, depending on the stage of execution.
+pub(super) struct Core<T: Future, S> {
+ /// Scheduler used to drive this future
+ pub(super) scheduler: UnsafeCell<Option<S>>,
+
+ /// Either the future or the output
+ pub(super) stage: UnsafeCell<Stage<T>>,
+}
+
+/// Crate public as this is also needed by the pool.
+#[repr(C)]
+pub(crate) struct Header {
+ /// Task state
+ pub(super) state: State,
+
+ pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,
+
+ /// Pointer to next task, used with the injection queue
+ pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,
+
+ /// Pointer to the next task in the transfer stack
+ pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,
+
+ /// Table of function pointers for executing actions on the task.
+ pub(super) vtable: &'static Vtable,
+}
+
+unsafe impl Send for Header {}
+unsafe impl Sync for Header {}
+
+/// Cold data is stored after the future.
+pub(super) struct Trailer {
+ /// Consumer task waiting on completion of this task.
+ pub(super) waker: UnsafeCell<Option<Waker>>,
+}
+
+/// Either the future or the output.
+pub(super) enum Stage<T: Future> {
+ Running(T),
+ Finished(super::Result<T::Output>),
+ Consumed,
+}
+
+impl<T: Future, S: Schedule> Cell<T, S> {
+ /// Allocates a new task cell, containing the header, trailer, and core
+ /// structures.
+ pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
+ Box::new(Cell {
+ header: Header {
+ state,
+ owned: UnsafeCell::new(linked_list::Pointers::new()),
+ queue_next: UnsafeCell::new(None),
+ stack_next: UnsafeCell::new(None),
+ vtable: raw::vtable::<T, S>(),
+ },
+ core: Core {
+ scheduler: UnsafeCell::new(None),
+ stage: UnsafeCell::new(Stage::Running(future)),
+ },
+ trailer: Trailer {
+ waker: UnsafeCell::new(None),
+ },
+ })
+ }
+}
+
+impl<T: Future, S: Schedule> Core<T, S> {
+ /// If needed, bind a scheduler to the task.
+ ///
+ /// This only happens on the first poll.
+ pub(super) fn bind_scheduler(&self, task: Task<S>) {
+ use std::mem::ManuallyDrop;
+
+ // TODO: it would be nice to not have to wrap with a ManuallyDrop
+ let task = ManuallyDrop::new(task);
+
+ // This function may be called concurrently, but the __first__ time it
+ // is called, the caller has unique access to this field. All subsequent
+ // concurrent calls will be via the `Waker`, which will "happens after"
+ // the first poll.
+ //
+ // In other words, it is always safe to read the field and it is safe to
+ // write to the field when it is `None`.
+ if self.is_bound() {
+ return;
+ }
+
+ // Bind the task to the scheduler
+ let scheduler = S::bind(ManuallyDrop::into_inner(task));
+
+ // Safety: As `scheduler` is not set, this is the first poll
+ self.scheduler.with_mut(|ptr| unsafe {
+ *ptr = Some(scheduler);
+ });
+ }
+
+ /// Returns true if the task is bound to a scheduler.
+ pub(super) fn is_bound(&self) -> bool {
+ // Safety: never called concurrently w/ a mutation.
+ self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
+ }
+
+ /// Poll the future
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `state` field. This
+ /// requires ensuring mutal exclusion between any concurrent thread that
+ /// might modify the future or output field.
+ ///
+ /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
+ /// component of the task state.
+ ///
+ /// `self` must also be pinned. This is handled by storing the task on the
+ /// heap.
+ pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> {
+ let res = {
+ self.stage.with_mut(|ptr| {
+ // Safety: The caller ensures mutual exclusion to the field.
+ let future = match unsafe { &mut *ptr } {
+ Stage::Running(future) => future,
+ _ => unreachable!("unexpected stage"),
+ };
+
+ // Safety: The caller ensures the future is pinned.
+ let future = unsafe { Pin::new_unchecked(future) };
+
+ // The waker passed into the `poll` function does not require a ref
+ // count increment.
+ let waker_ref = waker_ref::<T, S>(header);
+ let mut cx = Context::from_waker(&*waker_ref);
+
+ future.poll(&mut cx)
+ })
+ };
+
+ if res.is_ready() {
+ self.drop_future_or_output();
+ }
+
+ res
+ }
+
+ /// Drop the future
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn drop_future_or_output(&self) {
+ self.stage.with_mut(|ptr| {
+ // Safety: The caller ensures mutal exclusion to the field.
+ unsafe { *ptr = Stage::Consumed };
+ });
+ }
+
+ /// Store the task output
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn store_output(&self, output: super::Result<T::Output>) {
+ self.stage.with_mut(|ptr| {
+ // Safety: the caller ensures mutual exclusion to the field.
+ unsafe { *ptr = Stage::Finished(output) };
+ });
+ }
+
+ /// Take the task output
+ ///
+ /// # Safety
+ ///
+ /// The caller must ensure it is safe to mutate the `stage` field.
+ pub(super) fn take_output(&self) -> super::Result<T::Output> {
+ use std::mem;
+
+ self.stage.with_mut(|ptr| {
+ // Safety:: the caller ensures mutal exclusion to the field.
+ match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
+ Stage::Finished(output) => output,
+ _ => panic!("unexpected task state"),
+ }
+ })
+ }
+
+ /// Schedule the future for execution
+ pub(super) fn schedule(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.schedule(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Schedule the future for execution in the near future, yielding the
+ /// thread to other tasks.
+ pub(super) fn yield_now(&self, task: Notified<S>) {
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.yield_now(task),
+ None => panic!("no scheduler set"),
+ }
+ });
+ }
+
+ /// Release the task
+ ///
+ /// If the `Scheduler` implementation is able to, it returns the `Task`
+ /// handle immediately. The caller of this function will batch a ref-dec
+ /// with a state change.
+ pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
+ use std::mem::ManuallyDrop;
+
+ let task = ManuallyDrop::new(task);
+
+ self.scheduler.with(|ptr| {
+ // Safety: Can only be called after initial `poll`, which is the
+ // only time the field is mutated.
+ match unsafe { &*ptr } {
+ Some(scheduler) => scheduler.release(&*task),
+ // Task was never polled
+ None => None,
+ }
+ })
+ }
+}
+
+cfg_rt_threaded! {
+ impl Header {
+ pub(crate) fn shutdown(&self) {
+ use crate::runtime::task::RawTask;
+
+ let task = unsafe { RawTask::from_raw(self.into()) };
+ task.shutdown();
+ }
+ }
+}
+
+#[test]
+#[cfg(not(loom))]
+fn header_lte_cache_line() {
+ use std::mem::size_of;
+
+ assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
+}
diff --git a/third_party/rust/tokio/src/runtime/task/error.rs b/third_party/rust/tokio/src/runtime/task/error.rs
new file mode 100644
index 0000000000..d5f65a4981
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/error.rs
@@ -0,0 +1,163 @@
+use std::any::Any;
+use std::fmt;
+use std::io;
+use std::sync::Mutex;
+
+doc_rt_core! {
+ /// Task failed to execute to completion.
+ pub struct JoinError {
+ repr: Repr,
+ }
+}
+
+enum Repr {
+ Cancelled,
+ Panic(Mutex<Box<dyn Any + Send + 'static>>),
+}
+
+impl JoinError {
+ #[doc(hidden)]
+ #[deprecated]
+ pub fn cancelled() -> JoinError {
+ Self::cancelled2()
+ }
+
+ pub(crate) fn cancelled2() -> JoinError {
+ JoinError {
+ repr: Repr::Cancelled,
+ }
+ }
+
+ #[doc(hidden)]
+ #[deprecated]
+ pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ Self::panic2(err)
+ }
+
+ pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ JoinError {
+ repr: Repr::Panic(Mutex::new(err)),
+ }
+ }
+
+ /// Returns true if the error was caused by the task being cancelled
+ pub fn is_cancelled(&self) -> bool {
+ match &self.repr {
+ Repr::Cancelled => true,
+ _ => false,
+ }
+ }
+
+ /// Returns true if the error was caused by the task panicking
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// assert!(err.is_panic());
+ /// }
+ /// ```
+ pub fn is_panic(&self) -> bool {
+ match &self.repr {
+ Repr::Panic(_) => true,
+ _ => false,
+ }
+ }
+
+ /// Consumes the join error, returning the object with which the task panicked.
+ ///
+ /// # Panics
+ ///
+ /// `into_panic()` panics if the `Error` does not represent the underlying
+ /// task terminating with a panic. Use `is_panic` to check the error reason
+ /// or `try_into_panic` for a variant that does not panic.
+ ///
+ /// # Examples
+ ///
+ /// ```should_panic
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// if err.is_panic() {
+ /// // Resume the panic on the main task
+ /// panic::resume_unwind(err.into_panic());
+ /// }
+ /// }
+ /// ```
+ pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
+ self.try_into_panic()
+ .expect("`JoinError` reason is not a panic.")
+ }
+
+ /// Consumes the join error, returning the object with which the task
+ /// panicked if the task terminated due to a panic. Otherwise, `self` is
+ /// returned.
+ ///
+ /// # Examples
+ ///
+ /// ```should_panic
+ /// use std::panic;
+ ///
+ /// #[tokio::main]
+ /// async fn main() {
+ /// let err = tokio::spawn(async {
+ /// panic!("boom");
+ /// }).await.unwrap_err();
+ ///
+ /// if let Ok(reason) = err.try_into_panic() {
+ /// // Resume the panic on the main task
+ /// panic::resume_unwind(reason);
+ /// }
+ /// }
+ /// ```
+ pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> {
+ match self.repr {
+ Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")),
+ _ => Err(self),
+ }
+ }
+}
+
+impl fmt::Display for JoinError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &self.repr {
+ Repr::Cancelled => write!(fmt, "cancelled"),
+ Repr::Panic(_) => write!(fmt, "panic"),
+ }
+ }
+}
+
+impl fmt::Debug for JoinError {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match &self.repr {
+ Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
+ Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
+ }
+ }
+}
+
+impl std::error::Error for JoinError {}
+
+impl From<JoinError> for io::Error {
+ fn from(src: JoinError) -> io::Error {
+ io::Error::new(
+ io::ErrorKind::Other,
+ match src.repr {
+ Repr::Cancelled => "task was cancelled",
+ Repr::Panic(_) => "task panicked",
+ },
+ )
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/harness.rs b/third_party/rust/tokio/src/runtime/task/harness.rs
new file mode 100644
index 0000000000..29b231ea88
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/harness.rs
@@ -0,0 +1,372 @@
+use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::{JoinError, Notified, Schedule, Task};
+
+use std::future::Future;
+use std::mem;
+use std::panic;
+use std::ptr::NonNull;
+use std::task::{Poll, Waker};
+
+/// Typed raw task handle
+pub(super) struct Harness<T: Future, S: 'static> {
+ cell: NonNull<Cell<T, S>>,
+}
+
+impl<T, S> Harness<T, S>
+where
+ T: Future,
+ S: 'static,
+{
+ pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
+ Harness {
+ cell: ptr.cast::<Cell<T, S>>(),
+ }
+ }
+
+ fn header(&self) -> &Header {
+ unsafe { &self.cell.as_ref().header }
+ }
+
+ fn trailer(&self) -> &Trailer {
+ unsafe { &self.cell.as_ref().trailer }
+ }
+
+ fn core(&self) -> &Core<T, S> {
+ unsafe { &self.cell.as_ref().core }
+ }
+}
+
+impl<T, S> Harness<T, S>
+where
+ T: Future,
+ S: Schedule,
+{
+ /// Polls the inner future.
+ ///
+ /// All necessary state checks and transitions are performed.
+ ///
+ /// Panics raised while polling the future are handled.
+ pub(super) fn poll(self) {
+ // If this is the first time the task is polled, the task will be bound
+ // to the scheduler, in which case the task ref count must be
+ // incremented.
+ let ref_inc = !self.core().is_bound();
+
+ // Transition the task to the running state.
+ //
+ // A failure to transition here indicates the task has been cancelled
+ // while in the run queue pending execution.
+ let snapshot = match self.header().state.transition_to_running(ref_inc) {
+ Ok(snapshot) => snapshot,
+ Err(_) => {
+ // The task was shutdown while in the run queue. At this point,
+ // we just hold a ref counted reference. Drop it here.
+ self.drop_reference();
+ return;
+ }
+ };
+
+ // Ensure the task is bound to a scheduler instance. If this is the
+ // first time polling the task, a scheduler instance is pulled from the
+ // local context and assigned to the task.
+ //
+ // The scheduler maintains ownership of the task and responds to `wake`
+ // calls.
+ //
+ // The task reference count has been incremented.
+ self.core().bind_scheduler(self.to_task());
+
+ // The transition to `Running` done above ensures that a lock on the
+ // future has been obtained. This also ensures the `*mut T` pointer
+ // contains the future (as opposed to the output) and is initialized.
+
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a, T: Future, S: Schedule> {
+ core: &'a Core<T, S>,
+ polled: bool,
+ }
+
+ impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> {
+ fn drop(&mut self) {
+ if !self.polled {
+ self.core.drop_future_or_output();
+ }
+ }
+ }
+
+ let mut guard = Guard {
+ core: self.core(),
+ polled: false,
+ };
+
+ // If the task is cancelled, avoid polling it, instead signalling it
+ // is complete.
+ if snapshot.is_cancelled() {
+ Poll::Ready(Err(JoinError::cancelled2()))
+ } else {
+ let res = guard.core.poll(self.header());
+
+ // prevent the guard from dropping the future
+ guard.polled = true;
+
+ res.map(Ok)
+ }
+ }));
+
+ match res {
+ Ok(Poll::Ready(out)) => {
+ self.complete(out, snapshot.is_join_interested());
+ }
+ Ok(Poll::Pending) => {
+ match self.header().state.transition_to_idle() {
+ Ok(snapshot) => {
+ if snapshot.is_notified() {
+ // Signal yield
+ self.core().yield_now(Notified(self.to_task()));
+ // The ref-count was incremented as part of
+ // `transition_to_idle`.
+ self.drop_reference();
+ }
+ }
+ Err(_) => self.cancel_task(),
+ }
+ }
+ Err(err) => {
+ self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested());
+ }
+ }
+ }
+
+ pub(super) fn dealloc(self) {
+ // Release the join waker, if there is one.
+ self.trailer().waker.with_mut(|_| ());
+
+ // Check causality
+ self.core().stage.with_mut(|_| {});
+ self.core().scheduler.with_mut(|_| {});
+
+ unsafe {
+ drop(Box::from_raw(self.cell.as_ptr()));
+ }
+ }
+
+ // ===== join handle =====
+
+ /// Read the task output into `dst`.
+ pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
+ // Load a snapshot of the current task state
+ let snapshot = self.header().state.load();
+
+ debug_assert!(snapshot.is_join_interested());
+
+ if !snapshot.is_complete() {
+ // The waker must be stored in the task struct.
+ let res = if snapshot.has_join_waker() {
+ // There already is a waker stored in the struct. If it matches
+ // the provided waker, then there is no further work to do.
+ // Otherwise, the waker must be swapped.
+ let will_wake = unsafe {
+ // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
+ // may mutate the `waker` field.
+ self.trailer()
+ .waker
+ .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
+ };
+
+ if will_wake {
+ // The task is not complete **and** the waker is up to date,
+ // there is nothing further that needs to be done.
+ return;
+ }
+
+ // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
+ // field then update the field with the new join worker.
+ //
+ // This requires two atomic operations, unsetting the bit and
+ // then resetting it. If the task transitions to complete
+ // concurrently to either one of those operations, then setting
+ // the join waker fails and we proceed to reading the task
+ // output.
+ self.header()
+ .state
+ .unset_waker()
+ .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot))
+ } else {
+ self.set_join_waker(waker.clone(), snapshot)
+ };
+
+ match res {
+ Ok(_) => return,
+ Err(snapshot) => {
+ assert!(snapshot.is_complete());
+ }
+ }
+ }
+
+ *dst = Poll::Ready(self.core().take_output());
+ }
+
+ fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> {
+ assert!(snapshot.is_join_interested());
+ assert!(!snapshot.has_join_waker());
+
+ // Safety: Only the `JoinHandle` may set the `waker` field. When
+ // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
+ unsafe {
+ self.trailer().waker.with_mut(|ptr| {
+ *ptr = Some(waker);
+ });
+ }
+
+ // Update the `JoinWaker` state accordingly
+ let res = self.header().state.set_join_waker();
+
+ // If the state could not be updated, then clear the join waker
+ if res.is_err() {
+ unsafe {
+ self.trailer().waker.with_mut(|ptr| {
+ *ptr = None;
+ });
+ }
+ }
+
+ res
+ }
+
+ pub(super) fn drop_join_handle_slow(self) {
+ // Try to unset `JOIN_INTEREST`. This must be done as a first step in
+ // case the task concurrently completed.
+ if self.header().state.unset_join_interested().is_err() {
+ // It is our responsibility to drop the output. This is critical as
+ // the task output may not be `Send` and as such must remain with
+ // the scheduler or `JoinHandle`. i.e. if the output remains in the
+ // task structure until the task is deallocated, it may be dropped
+ // by a Waker on any arbitrary thread.
+ self.core().drop_future_or_output();
+ }
+
+ // Drop the `JoinHandle` reference, possibly deallocating the task
+ self.drop_reference();
+ }
+
+ // ===== waker behavior =====
+
+ pub(super) fn wake_by_val(self) {
+ self.wake_by_ref();
+ self.drop_reference();
+ }
+
+ pub(super) fn wake_by_ref(&self) {
+ if self.header().state.transition_to_notified() {
+ self.core().schedule(Notified(self.to_task()));
+ }
+ }
+
+ pub(super) fn drop_reference(self) {
+ if self.header().state.ref_dec() {
+ self.dealloc();
+ }
+ }
+
+ /// Forcibly shutdown the task
+ ///
+ /// Attempt to transition to `Running` in order to forcibly shutdown the
+ /// task. If the task is currently running or in a state of completion, then
+ /// there is nothing further to do. When the task completes running, it will
+ /// notice the `CANCELLED` bit and finalize the task.
+ pub(super) fn shutdown(self) {
+ if !self.header().state.transition_to_shutdown() {
+ // The task is concurrently running. No further work needed.
+ return;
+ }
+
+ // By transitioning the lifcycle to `Running`, we have permission to
+ // drop the future.
+ self.cancel_task();
+ }
+
+ // ====== internal ======
+
+ fn cancel_task(self) {
+ // Drop the future from a panic guard.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ self.core().drop_future_or_output();
+ }));
+
+ if let Err(err) = res {
+ // Dropping the future panicked, complete the join
+ // handle with the panic to avoid dropping the panic
+ // on the ground.
+ self.complete(Err(JoinError::panic2(err)), true);
+ } else {
+ self.complete(Err(JoinError::cancelled2()), true);
+ }
+ }
+
+ fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) {
+ if is_join_interested {
+ // Store the output. The future has already been dropped
+ //
+ // Safety: Mutual exclusion is obtained by having transitioned the task
+ // state -> Running
+ self.core().store_output(output);
+
+ // Transition to `Complete`, notifying the `JoinHandle` if necessary.
+ self.transition_to_complete();
+ }
+
+ // The task has completed execution and will no longer be scheduled.
+ //
+ // Attempts to batch a ref-dec with the state transition below.
+ let ref_dec = if self.core().is_bound() {
+ if let Some(task) = self.core().release(self.to_task()) {
+ mem::forget(task);
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ };
+
+ // This might deallocate
+ let snapshot = self
+ .header()
+ .state
+ .transition_to_terminal(!is_join_interested, ref_dec);
+
+ if snapshot.ref_count() == 0 {
+ self.dealloc()
+ }
+ }
+
+ /// Transitions the task's lifecycle to `Complete`. Notifies the
+ /// `JoinHandle` if it still has interest in the completion.
+ fn transition_to_complete(&mut self) {
+ // Transition the task's lifecycle to `Complete` and get a snapshot of
+ // the task's sate.
+ let snapshot = self.header().state.transition_to_complete();
+
+ if !snapshot.is_join_interested() {
+ // The `JoinHandle` is not interested in the output of this task. It
+ // is our responsibility to drop the output.
+ self.core().drop_future_or_output();
+ } else if snapshot.has_join_waker() {
+ // Notify the join handle. The previous transition obtains the
+ // lock on the waker cell.
+ self.wake_join();
+ }
+ }
+
+ fn wake_join(&self) {
+ self.trailer().waker.with(|ptr| match unsafe { &*ptr } {
+ Some(waker) => waker.wake_by_ref(),
+ None => panic!("waker missing"),
+ });
+ }
+
+ fn to_task(&self) -> Task<S> {
+ unsafe { Task::from_raw(self.header().into()) }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/join.rs b/third_party/rust/tokio/src/runtime/task/join.rs
new file mode 100644
index 0000000000..fdcc346e5c
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/join.rs
@@ -0,0 +1,152 @@
+use crate::runtime::task::RawTask;
+
+use std::fmt;
+use std::future::Future;
+use std::marker::PhantomData;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+doc_rt_core! {
+ /// An owned permission to join on a task (await its termination).
+ ///
+ /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
+ /// a task rather than a thread.
+ ///
+ /// A `JoinHandle` *detaches* the associated task when it is dropped, which
+ /// means that there is no longer any handle to the task, and no way to `join`
+ /// on it.
+ ///
+ /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
+ /// functions.
+ ///
+ /// # Examples
+ ///
+ /// Creation from [`task::spawn`]:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<_> = task::spawn(async {
+ /// // some work here
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Creation from [`task::spawn_blocking`]:
+ ///
+ /// ```
+ /// use tokio::task;
+ ///
+ /// # async fn doc() {
+ /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| {
+ /// // some blocking work here
+ /// });
+ /// # }
+ /// ```
+ ///
+ /// Child being detached and outliving its parent:
+ ///
+ /// ```no_run
+ /// use tokio::task;
+ /// use tokio::time;
+ /// use std::time::Duration;
+ ///
+ /// # #[tokio::main] async fn main() {
+ /// let original_task = task::spawn(async {
+ /// let _detached_task = task::spawn(async {
+ /// // Here we sleep to make sure that the first task returns before.
+ /// time::delay_for(Duration::from_millis(10)).await;
+ /// // This will be called, even though the JoinHandle is dropped.
+ /// println!("♫ Still alive ♫");
+ /// });
+ /// });
+ ///
+ /// original_task.await.expect("The task being joined has panicked");
+ /// println!("Original task is joined.");
+ ///
+ /// // We make sure that the new task has time to run, before the main
+ /// // task returns.
+ ///
+ /// time::delay_for(Duration::from_millis(1000)).await;
+ /// # }
+ /// ```
+ ///
+ /// [`task::spawn`]: crate::task::spawn()
+ /// [`task::spawn_blocking`]: crate::task::spawn_blocking
+ /// [`std::thread::JoinHandle`]: std::thread::JoinHandle
+ pub struct JoinHandle<T> {
+ raw: Option<RawTask>,
+ _p: PhantomData<T>,
+ }
+}
+
+unsafe impl<T: Send> Send for JoinHandle<T> {}
+unsafe impl<T: Send> Sync for JoinHandle<T> {}
+
+impl<T> JoinHandle<T> {
+ pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
+ JoinHandle {
+ raw: Some(raw),
+ _p: PhantomData,
+ }
+ }
+}
+
+impl<T> Unpin for JoinHandle<T> {}
+
+impl<T> Future for JoinHandle<T> {
+ type Output = super::Result<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut ret = Poll::Pending;
+
+ // Keep track of task budget
+ ready!(crate::coop::poll_proceed(cx));
+
+ // Raw should always be set. If it is not, this is due to polling after
+ // completion
+ let raw = self
+ .raw
+ .as_ref()
+ .expect("polling after `JoinHandle` already completed");
+
+ // Try to read the task output. If the task is not yet complete, the
+ // waker is stored and is notified once the task does complete.
+ //
+ // The function must go via the vtable, which requires erasing generic
+ // types. To do this, the function "return" is placed on the stack
+ // **before** calling the function and is passed into the function using
+ // `*mut ()`.
+ //
+ // Safety:
+ //
+ // The type of `T` must match the task's output type.
+ unsafe {
+ raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
+ }
+
+ ret
+ }
+}
+
+impl<T> Drop for JoinHandle<T> {
+ fn drop(&mut self) {
+ if let Some(raw) = self.raw.take() {
+ if raw.header().state.drop_join_handle_fast().is_ok() {
+ return;
+ }
+
+ raw.drop_join_handle_slow();
+ }
+ }
+}
+
+impl<T> fmt::Debug for JoinHandle<T>
+where
+ T: fmt::Debug,
+{
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("JoinHandle").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/mod.rs b/third_party/rust/tokio/src/runtime/task/mod.rs
new file mode 100644
index 0000000000..17b5157e84
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/mod.rs
@@ -0,0 +1,220 @@
+mod core;
+use self::core::Cell;
+pub(crate) use self::core::Header;
+
+mod error;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::error::JoinError;
+
+mod harness;
+use self::harness::Harness;
+
+mod join;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::join::JoinHandle;
+
+mod raw;
+use self::raw::RawTask;
+
+mod state;
+use self::state::State;
+
+mod waker;
+
+cfg_rt_threaded! {
+ mod stack;
+ pub(crate) use self::stack::TransferStack;
+}
+
+use crate::util::linked_list;
+
+use std::future::Future;
+use std::marker::PhantomData;
+use std::ptr::NonNull;
+use std::{fmt, mem};
+
+/// An owned handle to the task, tracked by ref count
+#[repr(transparent)]
+pub(crate) struct Task<S: 'static> {
+ raw: RawTask,
+ _p: PhantomData<S>,
+}
+
+unsafe impl<S> Send for Task<S> {}
+unsafe impl<S> Sync for Task<S> {}
+
+/// A task was notified
+#[repr(transparent)]
+pub(crate) struct Notified<S: 'static>(Task<S>);
+
+unsafe impl<S: Schedule> Send for Notified<S> {}
+unsafe impl<S: Schedule> Sync for Notified<S> {}
+
+/// Task result sent back
+pub(crate) type Result<T> = std::result::Result<T, JoinError>;
+
+pub(crate) trait Schedule: Sync + Sized + 'static {
+ /// Bind a task to the executor.
+ ///
+ /// Guaranteed to be called from the thread that called `poll` on the task.
+ /// The returned `Schedule` instance is associated with the task and is used
+ /// as `&self` in the other methods on this trait.
+ fn bind(task: Task<Self>) -> Self;
+
+ /// The task has completed work and is ready to be released. The scheduler
+ /// is free to drop it whenever.
+ ///
+ /// If the scheduler can immediately release the task, it should return
+ /// it as part of the function. This enables the task module to batch
+ /// the ref-dec with other options.
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
+
+ /// Schedule the task
+ fn schedule(&self, task: Notified<Self>);
+
+ /// Schedule the task to run in the near future, yielding the thread to
+ /// other tasks.
+ fn yield_now(&self, task: Notified<Self>) {
+ self.schedule(task);
+ }
+}
+
+/// Create a new task with an associated join handle
+pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+where
+ T: Future + Send + 'static,
+ S: Schedule,
+{
+ let raw = RawTask::new::<_, S>(task);
+
+ let task = Task {
+ raw,
+ _p: PhantomData,
+ };
+
+ let join = JoinHandle::new(raw);
+
+ (Notified(task), join)
+}
+
+cfg_rt_util! {
+ /// Create a new `!Send` task with an associated join handle
+ pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ where
+ T: Future + 'static,
+ S: Schedule,
+ {
+ let raw = RawTask::new::<_, S>(task);
+
+ let task = Task {
+ raw,
+ _p: PhantomData,
+ };
+
+ let join = JoinHandle::new(raw);
+
+ (Notified(task), join)
+ }
+}
+
+impl<S: 'static> Task<S> {
+ pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ Task {
+ raw: RawTask::from_raw(ptr),
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn header(&self) -> &Header {
+ self.raw.header()
+ }
+}
+
+cfg_rt_threaded! {
+ impl<S: 'static> Notified<S> {
+ pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
+ Notified(Task::from_raw(ptr))
+ }
+
+ pub(crate) fn header(&self) -> &Header {
+ self.0.header()
+ }
+ }
+
+ impl<S: 'static> Task<S> {
+ pub(crate) fn into_raw(self) -> NonNull<Header> {
+ let ret = self.header().into();
+ mem::forget(self);
+ ret
+ }
+ }
+
+ impl<S: 'static> Notified<S> {
+ pub(crate) fn into_raw(self) -> NonNull<Header> {
+ self.0.into_raw()
+ }
+ }
+}
+
+impl<S: Schedule> Task<S> {
+ /// Pre-emptively cancel the task as part of the shutdown process.
+ pub(crate) fn shutdown(&self) {
+ self.raw.shutdown();
+ }
+}
+
+impl<S: Schedule> Notified<S> {
+ /// Run the task
+ pub(crate) fn run(self) {
+ self.0.raw.poll();
+ mem::forget(self);
+ }
+
+ /// Pre-emptively cancel the task as part of the shutdown process.
+ pub(crate) fn shutdown(self) {
+ self.0.shutdown();
+ }
+}
+
+impl<S: 'static> Drop for Task<S> {
+ fn drop(&mut self) {
+ // Decrement the ref count
+ if self.header().state.ref_dec() {
+ // Deallocate if this is the final ref count
+ self.raw.dealloc();
+ }
+ }
+}
+
+impl<S> fmt::Debug for Task<S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "Task({:p})", self.header())
+ }
+}
+
+impl<S> fmt::Debug for Notified<S> {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(fmt, "task::Notified({:p})", self.0.header())
+ }
+}
+
+/// # Safety
+///
+/// Tasks are pinned
+unsafe impl<S> linked_list::Link for Task<S> {
+ type Handle = Task<S>;
+ type Target = Header;
+
+ fn as_raw(handle: &Task<S>) -> NonNull<Header> {
+ handle.header().into()
+ }
+
+ unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ Task::from_raw(ptr)
+ }
+
+ unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
+ // Not super great as it avoids some of looms checking...
+ NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/raw.rs b/third_party/rust/tokio/src/runtime/task/raw.rs
new file mode 100644
index 0000000000..cae56d037d
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/raw.rs
@@ -0,0 +1,131 @@
+use crate::runtime::task::{Cell, Harness, Header, Schedule, State};
+
+use std::future::Future;
+use std::ptr::NonNull;
+use std::task::{Poll, Waker};
+
+/// Raw task handle
+pub(super) struct RawTask {
+ ptr: NonNull<Header>,
+}
+
+pub(super) struct Vtable {
+ /// Poll the future
+ pub(super) poll: unsafe fn(NonNull<Header>),
+
+ /// Deallocate the memory
+ pub(super) dealloc: unsafe fn(NonNull<Header>),
+
+ /// Read the task output, if complete
+ pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
+
+ /// The join handle has been dropped
+ pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),
+
+ /// Scheduler is being shutdown
+ pub(super) shutdown: unsafe fn(NonNull<Header>),
+}
+
+/// Get the vtable for the requested `T` and `S` generics.
+pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
+ &Vtable {
+ poll: poll::<T, S>,
+ dealloc: dealloc::<T, S>,
+ try_read_output: try_read_output::<T, S>,
+ drop_join_handle_slow: drop_join_handle_slow::<T, S>,
+ shutdown: shutdown::<T, S>,
+ }
+}
+
+impl RawTask {
+ pub(super) fn new<T, S>(task: T) -> RawTask
+ where
+ T: Future,
+ S: Schedule,
+ {
+ let ptr = Box::into_raw(Cell::<_, S>::new(task, State::new()));
+ let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
+
+ RawTask { ptr }
+ }
+
+ pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
+ RawTask { ptr }
+ }
+
+ /// Returns a reference to the task's meta structure.
+ ///
+ /// Safe as `Header` is `Sync`.
+ pub(super) fn header(&self) -> &Header {
+ unsafe { self.ptr.as_ref() }
+ }
+
+ /// Safety: mutual exclusion is required to call this function.
+ pub(super) fn poll(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.poll)(self.ptr) }
+ }
+
+ pub(super) fn dealloc(self) {
+ let vtable = self.header().vtable;
+ unsafe {
+ (vtable.dealloc)(self.ptr);
+ }
+ }
+
+ /// Safety: `dst` must be a `*mut Poll<super::Result<T::Output>>` where `T`
+ /// is the future stored by the task.
+ pub(super) unsafe fn try_read_output(self, dst: *mut (), waker: &Waker) {
+ let vtable = self.header().vtable;
+ (vtable.try_read_output)(self.ptr, dst, waker);
+ }
+
+ pub(super) fn drop_join_handle_slow(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.drop_join_handle_slow)(self.ptr) }
+ }
+
+ pub(super) fn shutdown(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.shutdown)(self.ptr) }
+ }
+}
+
+impl Clone for RawTask {
+ fn clone(&self) -> Self {
+ RawTask { ptr: self.ptr }
+ }
+}
+
+impl Copy for RawTask {}
+
+unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.poll();
+}
+
+unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.dealloc();
+}
+
+unsafe fn try_read_output<T: Future, S: Schedule>(
+ ptr: NonNull<Header>,
+ dst: *mut (),
+ waker: &Waker,
+) {
+ let out = &mut *(dst as *mut Poll<super::Result<T::Output>>);
+
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.try_read_output(out, waker);
+}
+
+unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.drop_join_handle_slow()
+}
+
+unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.shutdown()
+}
diff --git a/third_party/rust/tokio/src/runtime/task/stack.rs b/third_party/rust/tokio/src/runtime/task/stack.rs
new file mode 100644
index 0000000000..9dd8d3f43f
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/stack.rs
@@ -0,0 +1,83 @@
+use crate::loom::sync::atomic::AtomicPtr;
+use crate::runtime::task::{Header, Task};
+
+use std::marker::PhantomData;
+use std::ptr::{self, NonNull};
+use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+
+/// Concurrent stack of tasks, used to pass ownership of a task from one worker
+/// to another.
+pub(crate) struct TransferStack<T: 'static> {
+ head: AtomicPtr<Header>,
+ _p: PhantomData<T>,
+}
+
+impl<T: 'static> TransferStack<T> {
+ pub(crate) fn new() -> TransferStack<T> {
+ TransferStack {
+ head: AtomicPtr::new(ptr::null_mut()),
+ _p: PhantomData,
+ }
+ }
+
+ pub(crate) fn push(&self, task: Task<T>) {
+ let task = task.into_raw();
+
+ // We don't care about any memory associated w/ setting the `head`
+ // field, just the current value.
+ //
+ // The compare-exchange creates a release sequence.
+ let mut curr = self.head.load(Relaxed);
+
+ loop {
+ unsafe {
+ task.as_ref()
+ .stack_next
+ .with_mut(|ptr| *ptr = NonNull::new(curr))
+ };
+
+ let res = self
+ .head
+ .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed);
+
+ match res {
+ Ok(_) => return,
+ Err(actual) => {
+ curr = actual;
+ }
+ }
+ }
+ }
+
+ pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
+ struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>);
+
+ impl<T: 'static> Iterator for Iter<T> {
+ type Item = Task<T>;
+
+ fn next(&mut self) -> Option<Task<T>> {
+ let task = self.0?;
+
+ // Move the cursor forward
+ self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };
+
+ // Return the task
+ unsafe { Some(Task::from_raw(task)) }
+ }
+ }
+
+ impl<T: 'static> Drop for Iter<T> {
+ fn drop(&mut self) {
+ use std::process;
+
+ if self.0.is_some() {
+ // we have bugs
+ process::abort();
+ }
+ }
+ }
+
+ let ptr = self.head.swap(ptr::null_mut(), Acquire);
+ Iter(NonNull::new(ptr), PhantomData)
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/state.rs b/third_party/rust/tokio/src/runtime/task/state.rs
new file mode 100644
index 0000000000..21e90430db
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/state.rs
@@ -0,0 +1,446 @@
+use crate::loom::sync::atomic::AtomicUsize;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
+use std::usize;
+
+pub(super) struct State {
+ val: AtomicUsize,
+}
+
+/// Current state value
+#[derive(Copy, Clone)]
+pub(super) struct Snapshot(usize);
+
+type UpdateResult = Result<Snapshot, Snapshot>;
+
+/// The task is currently being run.
+const RUNNING: usize = 0b0001;
+
+/// The task is complete.
+///
+/// Once this bit is set, it is never unset
+const COMPLETE: usize = 0b0010;
+
+/// Extracts the task's lifecycle value from the state
+const LIFECYCLE_MASK: usize = 0b11;
+
+/// Flag tracking if the task has been pushed into a run queue.
+const NOTIFIED: usize = 0b100;
+
+/// The join handle is still around
+const JOIN_INTEREST: usize = 0b1_000;
+
+/// A join handle waker has been set
+const JOIN_WAKER: usize = 0b10_000;
+
+/// The task has been forcibly cancelled.
+const CANCELLED: usize = 0b100_000;
+
+/// All bits
+const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
+
+/// Bits used by the ref count portion of the state.
+const REF_COUNT_MASK: usize = !STATE_MASK;
+
+/// Number of positions to shift the ref count
+const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
+
+/// One ref count
+const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
+
+/// State a task is initialized with
+///
+/// A task is initialized with two references: one for the scheduler and one for
+/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
+/// set. A new task is immediately pushed into the run queue for execution and
+/// starts with the `NOTIFIED` flag set.
+const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
+
+/// All transitions are performed via RMW operations. This establishes an
+/// unambiguous modification order.
+impl State {
+ /// Return a task's initial state
+ pub(super) fn new() -> State {
+ // A task is initialized with three references: one for the scheduler,
+ // one for the `JoinHandle`, one for the task handle made available in
+ // release. As the task starts with a `JoinHandle`, `JOIN_INTERST` is
+ // set. A new task is immediately pushed into the run queue for
+ // execution and starts with the `NOTIFIED` flag set.
+ State {
+ val: AtomicUsize::new(INITIAL_STATE),
+ }
+ }
+
+ /// Loads the current state, establishes `Acquire` ordering.
+ pub(super) fn load(&self) -> Snapshot {
+ Snapshot(self.val.load(Acquire))
+ }
+
+ /// Attempt to transition the lifecycle to `Running`.
+ ///
+ /// If `ref_inc` is set, the reference count is also incremented.
+ ///
+ /// The `NOTIFIED` bit is always unset.
+ pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_notified());
+
+ let mut next = curr;
+
+ if !next.is_idle() {
+ return None;
+ }
+
+ if ref_inc {
+ next.ref_inc();
+ }
+
+ next.set_running();
+ next.unset_notified();
+ Some(next)
+ })
+ }
+
+ /// Transitions the task from `Running` -> `Idle`.
+ ///
+ /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise.
+ /// In both cases, a snapshot of the state from **after** the transition is
+ /// returned.
+ ///
+ /// The transition to `Idle` fails if the task has been flagged to be
+ /// cancelled.
+ pub(super) fn transition_to_idle(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_running());
+
+ if curr.is_cancelled() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_running();
+
+ if next.is_notified() {
+ // The caller needs to schedule the task. To do this, it needs a
+ // waker. The waker requires a ref count.
+ next.ref_inc();
+ }
+
+ Some(next)
+ })
+ }
+
+ /// Transitions the task from `Running` -> `Complete`.
+ pub(super) fn transition_to_complete(&self) -> Snapshot {
+ const DELTA: usize = RUNNING | COMPLETE;
+
+ let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
+ assert!(prev.is_running());
+ assert!(!prev.is_complete());
+
+ Snapshot(prev.0 ^ DELTA)
+ }
+
+ /// Transition from `Complete` -> `Terminal`, decrementing the reference
+ /// count by 1.
+ ///
+ /// When `ref_dec` is set, an additional ref count decrement is performed.
+ /// This is used to batch atomic ops when possible.
+ pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot {
+ self.fetch_update(|mut snapshot| {
+ if complete {
+ snapshot.set_complete();
+ } else {
+ assert!(snapshot.is_complete());
+ }
+
+ // Decrement the primary handle
+ snapshot.ref_dec();
+
+ if ref_dec {
+ // Decrement a second time
+ snapshot.ref_dec();
+ }
+
+ Some(snapshot)
+ })
+ .unwrap()
+ }
+
+ /// Transitions the state to `NOTIFIED`.
+ ///
+ /// Returns `true` if the task needs to be submitted to the pool for
+ /// execution
+ pub(super) fn transition_to_notified(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel));
+ prev.will_need_queueing()
+ }
+
+ /// Set the `CANCELLED` bit and attempt to transition to `Running`.
+ ///
+ /// Returns `true` if the transition to `Running` succeeded.
+ pub(super) fn transition_to_shutdown(&self) -> bool {
+ let mut prev = Snapshot(0);
+
+ let _ = self.fetch_update(|mut snapshot| {
+ prev = snapshot;
+
+ if snapshot.is_idle() {
+ snapshot.set_running();
+
+ if snapshot.is_notified() {
+ // If the task is idle and notified, this indicates the task is
+ // in the run queue and is considered owned by the scheduler.
+ // The shutdown operation claims ownership of the task, which
+ // means we need to assign an additional ref-count to the task
+ // in the queue.
+ snapshot.ref_inc();
+ }
+ }
+
+ snapshot.set_cancelled();
+ Some(snapshot)
+ });
+
+ prev.is_idle()
+ }
+
+ /// Optimistically tries to swap the state assuming the join handle is
+ /// __immediately__ dropped on spawn
+ pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
+ use std::sync::atomic::Ordering::Relaxed;
+
+ // Relaxed is acceptable as if this function is called and succeeds,
+ // then nothing has been done w/ the join handle.
+ //
+ // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
+ // set, at which point the CAS will fail.
+ //
+ // Given this, there is no risk if this operation is reordered.
+ self.val
+ .compare_exchange_weak(
+ INITIAL_STATE,
+ (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
+ Release,
+ Relaxed,
+ )
+ .map(|_| ())
+ .map_err(|_| ())
+ }
+
+ /// Try to unset the JOIN_INTEREST flag.
+ ///
+ /// Returns `Ok` if the operation happens before the task transitions to a
+ /// completed state, `Err` otherwise.
+ pub(super) fn unset_join_interested(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_join_interested();
+
+ Some(next)
+ })
+ }
+
+ /// Set the `JOIN_WAKER` bit.
+ ///
+ /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
+ /// the task has completed.
+ pub(super) fn set_join_waker(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+ assert!(!curr.has_join_waker());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.set_join_waker();
+
+ Some(next)
+ })
+ }
+
+ /// Unsets the `JOIN_WAKER` bit.
+ ///
+ /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
+ /// the task has completed.
+ pub(super) fn unset_waker(&self) -> UpdateResult {
+ self.fetch_update(|curr| {
+ assert!(curr.is_join_interested());
+ assert!(curr.has_join_waker());
+
+ if curr.is_complete() {
+ return None;
+ }
+
+ let mut next = curr;
+ next.unset_join_waker();
+
+ Some(next)
+ })
+ }
+
+ pub(super) fn ref_inc(&self) {
+ use std::process;
+ use std::sync::atomic::Ordering::Relaxed;
+
+ // Using a relaxed ordering is alright here, as knowledge of the
+ // original reference prevents other threads from erroneously deleting
+ // the object.
+ //
+ // As explained in the [Boost documentation][1], Increasing the
+ // reference counter can always be done with memory_order_relaxed: New
+ // references to an object can only be formed from an existing
+ // reference, and passing an existing reference from one thread to
+ // another must already provide any required synchronization.
+ //
+ // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
+ let prev = self.val.fetch_add(REF_ONE, Relaxed);
+
+ // If the reference count overflowed, abort.
+ if prev > isize::max_value() as usize {
+ process::abort();
+ }
+ }
+
+ /// Returns `true` if the task should be released.
+ pub(super) fn ref_dec(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
+ prev.ref_count() == 1
+ }
+
+ fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
+ where
+ F: FnMut(Snapshot) -> Option<Snapshot>,
+ {
+ let mut curr = self.load();
+
+ loop {
+ let next = match f(curr) {
+ Some(next) => next,
+ None => return Err(curr),
+ };
+
+ let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return Ok(next),
+ Err(actual) => curr = Snapshot(actual),
+ }
+ }
+ }
+}
+
+// ===== impl Snapshot =====
+
+impl Snapshot {
+ /// Returns `true` if the task is in an idle state.
+ pub(super) fn is_idle(self) -> bool {
+ self.0 & (RUNNING | COMPLETE) == 0
+ }
+
+ /// Returns `true` if the task has been flagged as notified.
+ pub(super) fn is_notified(self) -> bool {
+ self.0 & NOTIFIED == NOTIFIED
+ }
+
+ fn unset_notified(&mut self) {
+ self.0 &= !NOTIFIED
+ }
+
+ pub(super) fn is_running(self) -> bool {
+ self.0 & RUNNING == RUNNING
+ }
+
+ fn set_running(&mut self) {
+ self.0 |= RUNNING;
+ }
+
+ fn unset_running(&mut self) {
+ self.0 &= !RUNNING;
+ }
+
+ pub(super) fn is_cancelled(self) -> bool {
+ self.0 & CANCELLED == CANCELLED
+ }
+
+ fn set_cancelled(&mut self) {
+ self.0 |= CANCELLED;
+ }
+
+ fn set_complete(&mut self) {
+ self.0 |= COMPLETE;
+ }
+
+ /// Returns `true` if the task's future has completed execution.
+ pub(super) fn is_complete(self) -> bool {
+ self.0 & COMPLETE == COMPLETE
+ }
+
+ pub(super) fn is_join_interested(self) -> bool {
+ self.0 & JOIN_INTEREST == JOIN_INTEREST
+ }
+
+ fn unset_join_interested(&mut self) {
+ self.0 &= !JOIN_INTEREST
+ }
+
+ pub(super) fn has_join_waker(self) -> bool {
+ self.0 & JOIN_WAKER == JOIN_WAKER
+ }
+
+ fn set_join_waker(&mut self) {
+ self.0 |= JOIN_WAKER;
+ }
+
+ fn unset_join_waker(&mut self) {
+ self.0 &= !JOIN_WAKER
+ }
+
+ pub(super) fn ref_count(self) -> usize {
+ (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
+ }
+
+ fn ref_inc(&mut self) {
+ assert!(self.0 <= isize::max_value() as usize);
+ self.0 += REF_ONE;
+ }
+
+ pub(super) fn ref_dec(&mut self) {
+ assert!(self.ref_count() > 0);
+ self.0 -= REF_ONE
+ }
+
+ fn will_need_queueing(self) -> bool {
+ !self.is_notified() && self.is_idle()
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let snapshot = self.load();
+ snapshot.fmt(fmt)
+ }
+}
+
+impl fmt::Debug for Snapshot {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Snapshot")
+ .field("is_running", &self.is_running())
+ .field("is_complete", &self.is_complete())
+ .field("is_notified", &self.is_notified())
+ .field("is_cancelled", &self.is_cancelled())
+ .field("is_join_interested", &self.is_join_interested())
+ .field("has_join_waker", &self.has_join_waker())
+ .field("ref_count", &self.ref_count())
+ .finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/task/waker.rs b/third_party/rust/tokio/src/runtime/task/waker.rs
new file mode 100644
index 0000000000..5c2d478fbb
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/task/waker.rs
@@ -0,0 +1,101 @@
+use crate::runtime::task::harness::Harness;
+use crate::runtime::task::{Header, Schedule};
+
+use std::future::Future;
+use std::marker::PhantomData;
+use std::mem::ManuallyDrop;
+use std::ops;
+use std::ptr::NonNull;
+use std::task::{RawWaker, RawWakerVTable, Waker};
+
+pub(super) struct WakerRef<'a, S: 'static> {
+ waker: ManuallyDrop<Waker>,
+ _p: PhantomData<(&'a Header, S)>,
+}
+
+/// Returns a `WakerRef` which avoids having to pre-emptively increase the
+/// refcount if there is no need to do so.
+pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
+where
+ T: Future,
+ S: Schedule,
+{
+ // `Waker::will_wake` uses the VTABLE pointer as part of the check. This
+ // means that `will_wake` will always return false when using the current
+ // task's waker. (discussion at rust-lang/rust#66281).
+ //
+ // To fix this, we use a single vtable. Since we pass in a reference at this
+ // point and not an *owned* waker, we must ensure that `drop` is never
+ // called on this waker instance. This is done by wrapping it with
+ // `ManuallyDrop` and then never calling drop.
+ let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
+
+ WakerRef {
+ waker,
+ _p: PhantomData,
+ }
+}
+
+impl<S> ops::Deref for WakerRef<'_, S> {
+ type Target = Waker;
+
+ fn deref(&self) -> &Waker {
+ &self.waker
+ }
+}
+
+unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
+where
+ T: Future,
+ S: Schedule,
+{
+ let header = ptr as *const Header;
+ (*header).state.ref_inc();
+ raw_waker::<T, S>(header)
+}
+
+unsafe fn drop_waker<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.drop_reference();
+}
+
+unsafe fn wake_by_val<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.wake_by_val();
+}
+
+// Wake without consuming the waker
+unsafe fn wake_by_ref<T, S>(ptr: *const ())
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = NonNull::new_unchecked(ptr as *mut Header);
+ let harness = Harness::<T, S>::from_raw(ptr);
+ harness.wake_by_ref();
+}
+
+fn raw_waker<T, S>(header: *const Header) -> RawWaker
+where
+ T: Future,
+ S: Schedule,
+{
+ let ptr = header as *const ();
+ let vtable = &RawWakerVTable::new(
+ clone_waker::<T, S>,
+ wake_by_val::<T, S>,
+ wake_by_ref::<T, S>,
+ drop_waker::<T, S>,
+ );
+ RawWaker::new(ptr, vtable)
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs
new file mode 100644
index 0000000000..db7048e3f9
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs
@@ -0,0 +1,31 @@
+use crate::runtime::{self, Runtime};
+
+use std::sync::Arc;
+
+#[test]
+fn blocking_shutdown() {
+ loom::model(|| {
+ let v = Arc::new(());
+
+ let rt = mk_runtime(1);
+ rt.enter(|| {
+ for _ in 0..2 {
+ let v = v.clone();
+ crate::task::spawn_blocking(move || {
+ assert!(1 < Arc::strong_count(&v));
+ });
+ }
+ });
+
+ drop(rt);
+ assert_eq!(1, Arc::strong_count(&v));
+ });
+}
+
+fn mk_runtime(num_threads: usize) -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(num_threads)
+ .build()
+ .unwrap()
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs
new file mode 100644
index 0000000000..c126fe479a
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs
@@ -0,0 +1,49 @@
+use loom::sync::Notify;
+
+use std::sync::{Arc, Mutex};
+
+pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner {
+ notify: Notify::new(),
+ value: Mutex::new(None),
+ });
+
+ let tx = Sender {
+ inner: inner.clone(),
+ };
+ let rx = Receiver { inner };
+
+ (tx, rx)
+}
+
+pub(crate) struct Sender<T> {
+ inner: Arc<Inner<T>>,
+}
+
+pub(crate) struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+struct Inner<T> {
+ notify: Notify,
+ value: Mutex<Option<T>>,
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn send(self, value: T) {
+ *self.inner.value.lock().unwrap() = Some(value);
+ self.inner.notify.notify();
+ }
+}
+
+impl<T> Receiver<T> {
+ pub(crate) fn recv(self) -> T {
+ loop {
+ if let Some(v) = self.inner.value.lock().unwrap().take() {
+ return v;
+ }
+
+ self.inner.notify.wait();
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_pool.rs b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs
new file mode 100644
index 0000000000..c08658cde8
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs
@@ -0,0 +1,380 @@
+/// Full runtime loom tests. These are heavy tests and take significant time to
+/// run on CI.
+///
+/// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
+///
+/// In order to speed up the C
+use crate::future::poll_fn;
+use crate::runtime::tests::loom_oneshot as oneshot;
+use crate::runtime::{self, Runtime};
+use crate::{spawn, task};
+use tokio_test::assert_ok;
+
+use loom::sync::atomic::{AtomicBool, AtomicUsize};
+use loom::sync::{Arc, Mutex};
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::task::{Context, Poll};
+
+/// Tests are divided into groups to make the runs faster on CI.
+mod group_a {
+ use super::*;
+
+ #[test]
+ fn racy_shutdown() {
+ loom::model(|| {
+ let pool = mk_pool(1);
+
+ // here's the case we want to exercise:
+ //
+ // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
+ // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
+ // shutdown method never gets run.
+ //
+ // we do this by spawning two tasks on one worker, the first of which does block_in_place,
+ // and then immediately drop the pool.
+
+ pool.spawn(track(async {
+ crate::task::block_in_place(|| {});
+ }));
+ pool.spawn(track(async {}));
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn pool_multi_spawn() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+ let c1 = Arc::new(AtomicUsize::new(0));
+
+ let (tx, rx) = oneshot::channel();
+ let tx1 = Arc::new(Mutex::new(Some(tx)));
+
+ // Spawn a task
+ let c2 = c1.clone();
+ let tx2 = tx1.clone();
+ pool.spawn(track(async move {
+ spawn(track(async move {
+ if 1 == c1.fetch_add(1, Relaxed) {
+ tx1.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }));
+
+ // Spawn a second task
+ pool.spawn(track(async move {
+ spawn(track(async move {
+ if 1 == c2.fetch_add(1, Relaxed) {
+ tx2.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }));
+
+ rx.recv();
+ });
+ }
+
+ fn only_blocking_inner(first_pending: bool) {
+ loom::model(move || {
+ let pool = mk_pool(1);
+ let (block_tx, block_rx) = oneshot::channel();
+
+ pool.spawn(track(async move {
+ crate::task::block_in_place(move || {
+ block_tx.send(());
+ });
+ if first_pending {
+ task::yield_now().await
+ }
+ }));
+
+ block_rx.recv();
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn only_blocking_without_pending() {
+ only_blocking_inner(false)
+ }
+
+ #[test]
+ fn only_blocking_with_pending() {
+ only_blocking_inner(true)
+ }
+}
+
+mod group_b {
+ use super::*;
+
+ fn blocking_and_regular_inner(first_pending: bool) {
+ const NUM: usize = 3;
+ loom::model(move || {
+ let pool = mk_pool(1);
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let (block_tx, block_rx) = oneshot::channel();
+ let (done_tx, done_rx) = oneshot::channel();
+ let done_tx = Arc::new(Mutex::new(Some(done_tx)));
+
+ pool.spawn(track(async move {
+ crate::task::block_in_place(move || {
+ block_tx.send(());
+ });
+ if first_pending {
+ task::yield_now().await
+ }
+ }));
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ let done_tx = done_tx.clone();
+
+ pool.spawn(track(async move {
+ if NUM == cnt.fetch_add(1, Relaxed) + 1 {
+ done_tx.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }
+
+ done_rx.recv();
+ block_rx.recv();
+
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn blocking_and_regular() {
+ blocking_and_regular_inner(false);
+ }
+
+ #[test]
+ fn blocking_and_regular_with_pending() {
+ blocking_and_regular_inner(true);
+ }
+
+ #[test]
+ fn pool_shutdown() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+
+ pool.spawn(track(async move {
+ gated2(true).await;
+ }));
+
+ pool.spawn(track(async move {
+ gated2(false).await;
+ }));
+
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn join_output() {
+ loom::model(|| {
+ let mut rt = mk_pool(1);
+
+ rt.block_on(async {
+ let t = crate::spawn(track(async { "hello" }));
+
+ let out = assert_ok!(t.await);
+ assert_eq!("hello", out.into_inner());
+ });
+ });
+ }
+
+ #[test]
+ fn poll_drop_handle_then_drop() {
+ loom::model(|| {
+ let mut rt = mk_pool(1);
+
+ rt.block_on(async move {
+ let mut t = crate::spawn(track(async { "hello" }));
+
+ poll_fn(|cx| {
+ let _ = Pin::new(&mut t).poll(cx);
+ Poll::Ready(())
+ })
+ .await;
+ });
+ })
+ }
+
+ #[test]
+ fn complete_block_on_under_load() {
+ loom::model(|| {
+ let mut pool = mk_pool(1);
+
+ pool.block_on(async {
+ // Trigger a re-schedule
+ crate::spawn(track(async {
+ for _ in 0..2 {
+ task::yield_now().await;
+ }
+ }));
+
+ gated2(true).await
+ });
+ });
+ }
+}
+
+mod group_c {
+ use super::*;
+
+ #[test]
+ fn shutdown_with_notification() {
+ use crate::sync::oneshot;
+
+ loom::model(|| {
+ let rt = mk_pool(2);
+ let (done_tx, done_rx) = oneshot::channel::<()>();
+
+ rt.spawn(track(async move {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ crate::spawn(async move {
+ crate::task::spawn_blocking(move || {
+ let _ = tx.send(());
+ });
+
+ let _ = done_rx.await;
+ });
+
+ let _ = rx.await;
+
+ let _ = done_tx.send(());
+ }));
+ });
+ }
+}
+
+mod group_d {
+ use super::*;
+
+ #[test]
+ fn pool_multi_notify() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+
+ let c1 = Arc::new(AtomicUsize::new(0));
+
+ let (done_tx, done_rx) = oneshot::channel();
+ let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
+
+ // Spawn a task
+ let c2 = c1.clone();
+ let done_tx2 = done_tx1.clone();
+ pool.spawn(track(async move {
+ gated().await;
+ gated().await;
+
+ if 1 == c1.fetch_add(1, Relaxed) {
+ done_tx1.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+
+ // Spawn a second task
+ pool.spawn(track(async move {
+ gated().await;
+ gated().await;
+
+ if 1 == c2.fetch_add(1, Relaxed) {
+ done_tx2.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+
+ done_rx.recv();
+ });
+ }
+}
+
+fn mk_pool(num_threads: usize) -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(num_threads)
+ .build()
+ .unwrap()
+}
+
+fn gated() -> impl Future<Output = &'static str> {
+ gated2(false)
+}
+
+fn gated2(thread: bool) -> impl Future<Output = &'static str> {
+ use loom::thread;
+ use std::sync::Arc;
+
+ let gate = Arc::new(AtomicBool::new(false));
+ let mut fired = false;
+
+ poll_fn(move |cx| {
+ if !fired {
+ let gate = gate.clone();
+ let waker = cx.waker().clone();
+
+ if thread {
+ thread::spawn(move || {
+ gate.store(true, SeqCst);
+ waker.wake_by_ref();
+ });
+ } else {
+ spawn(track(async move {
+ gate.store(true, SeqCst);
+ waker.wake_by_ref();
+ }));
+ }
+
+ fired = true;
+
+ return Poll::Pending;
+ }
+
+ if gate.load(SeqCst) {
+ Poll::Ready("hello world")
+ } else {
+ Poll::Pending
+ }
+ })
+}
+
+fn track<T: Future>(f: T) -> Track<T> {
+ Track {
+ inner: f,
+ arc: Arc::new(()),
+ }
+}
+
+pin_project! {
+ struct Track<T> {
+ #[pin]
+ inner: T,
+ // Arc is used to hook into loom's leak tracking.
+ arc: Arc<()>,
+ }
+}
+
+impl<T> Track<T> {
+ fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T: Future> Future for Track<T> {
+ type Output = Track<T::Output>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ Poll::Ready(Track {
+ inner: ready!(me.inner.poll(cx)),
+ arc: me.arc.clone(),
+ })
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_queue.rs b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs
new file mode 100644
index 0000000000..de02610db0
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs
@@ -0,0 +1,216 @@
+use crate::runtime::queue;
+use crate::runtime::task::{self, Schedule, Task};
+
+use loom::thread;
+
+#[test]
+fn basic() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..3 {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..2 {
+ for _ in 0..2 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ // Push another task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(6, n);
+ });
+}
+
+#[test]
+fn steal_overflow() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ // push a task, pop a task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ for _ in 0..6 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ n += th.join().unwrap();
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(7, n);
+ });
+}
+
+#[test]
+fn multi_stealer() {
+ const NUM_TASKS: usize = 5;
+
+ fn steal_tasks(steal: queue::Steal<Runtime>) -> usize {
+ let (_, mut local) = queue::local();
+
+ if steal.steal_into(&mut local).is_none() {
+ return 0;
+ }
+
+ let mut n = 1;
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ n
+ }
+
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ // Push work
+ for _ in 0..NUM_TASKS {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ let th1 = {
+ let steal = steal.clone();
+ thread::spawn(move || steal_tasks(steal))
+ };
+
+ let th2 = thread::spawn(move || steal_tasks(steal));
+
+ let mut n = 0;
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th1.join().unwrap();
+ n += th2.join().unwrap();
+
+ assert_eq!(n, NUM_TASKS);
+ });
+}
+
+#[test]
+fn chained_steal() {
+ loom::model(|| {
+ let (s1, mut l1) = queue::local();
+ let (s2, mut l2) = queue::local();
+ let inject = queue::Inject::new();
+
+ // Load up some tasks
+ for _ in 0..4 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l1.push_back(task, &inject);
+
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l2.push_back(task, &inject);
+ }
+
+ // Spawn a task to steal from **our** queue
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ s1.steal_into(&mut local);
+
+ while local.pop().is_some() {}
+ });
+
+ // Drain our tasks, then attempt to steal
+ while l1.pop().is_some() {}
+
+ s2.steal_into(&mut l1);
+
+ th.join().unwrap();
+
+ while l1.pop().is_some() {}
+ while l2.pop().is_some() {}
+ while inject.pop().is_some() {}
+ });
+}
+
+struct Runtime;
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ std::mem::forget(task);
+ Runtime
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/mod.rs b/third_party/rust/tokio/src/runtime/tests/mod.rs
new file mode 100644
index 0000000000..123a7e35a3
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/mod.rs
@@ -0,0 +1,13 @@
+cfg_loom! {
+ mod loom_blocking;
+ mod loom_oneshot;
+ mod loom_pool;
+ mod loom_queue;
+}
+
+cfg_not_loom! {
+ mod queue;
+
+ #[cfg(miri)]
+ mod task;
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/queue.rs b/third_party/rust/tokio/src/runtime/tests/queue.rs
new file mode 100644
index 0000000000..d228d5dcc7
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/queue.rs
@@ -0,0 +1,202 @@
+use crate::runtime::queue;
+use crate::runtime::task::{self, Schedule, Task};
+
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn fits_256() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..256 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ assert!(inject.pop().is_none());
+
+ while local.pop().is_some() {}
+}
+
+#[test]
+fn overflow() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..257 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ let mut n = 0;
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(n, 257);
+}
+
+#[test]
+fn steal_batch() {
+ let (steal1, mut local1) = queue::local();
+ let (_, mut local2) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..4 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local1.push_back(task, &inject);
+ }
+
+ assert!(steal1.steal_into(&mut local2).is_some());
+
+ for _ in 0..1 {
+ assert!(local2.pop().is_some());
+ }
+
+ assert!(local2.pop().is_none());
+
+ for _ in 0..2 {
+ assert!(local1.pop().is_some());
+ }
+
+ assert!(local1.pop().is_none());
+}
+
+#[test]
+fn stress1() {
+ const NUM_ITER: usize = 1;
+ const NUM_STEAL: usize = 1_000;
+ const NUM_LOCAL: usize = 1_000;
+ const NUM_PUSH: usize = 500;
+ const NUM_POP: usize = 250;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::yield_now();
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..NUM_LOCAL {
+ for _ in 0..NUM_PUSH {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ for _ in 0..NUM_POP {
+ if local.pop().is_some() {
+ n += 1;
+ } else {
+ break;
+ }
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(n, NUM_LOCAL * NUM_PUSH);
+ }
+}
+
+#[test]
+fn stress2() {
+ const NUM_ITER: usize = 1;
+ const NUM_TASKS: usize = 1_000_000;
+ const NUM_STEAL: usize = 1_000;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::sleep(Duration::from_micros(10));
+ }
+
+ n
+ });
+
+ let mut num_pop = 0;
+
+ for i in 0..NUM_TASKS {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if i % 128 == 0 && local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+ }
+
+ num_pop += th.join().unwrap();
+
+ while local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+
+ assert_eq!(num_pop, NUM_TASKS);
+ }
+}
+
+struct Runtime;
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ std::mem::forget(task);
+ Runtime
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/task.rs b/third_party/rust/tokio/src/runtime/tests/task.rs
new file mode 100644
index 0000000000..82315a04ff
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/task.rs
@@ -0,0 +1,159 @@
+use crate::runtime::task::{self, Schedule, Task};
+use crate::util::linked_list::LinkedList;
+use crate::util::TryLock;
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+#[test]
+fn create_drop() {
+ let _ = task::joinable::<_, Runtime>(async { unreachable!() });
+}
+
+#[test]
+fn schedule() {
+ with(|rt| {
+ let (task, _) = task::joinable(async {
+ crate::task::yield_now().await;
+ });
+
+ rt.schedule(task);
+
+ assert_eq!(2, rt.tick());
+ })
+}
+
+#[test]
+fn shutdown() {
+ with(|rt| {
+ let (task, _) = task::joinable(async {
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.schedule(task);
+ rt.tick_max(1);
+
+ rt.shutdown();
+ })
+}
+
+fn with(f: impl FnOnce(Runtime)) {
+ struct Reset;
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ let _rt = CURRENT.try_lock().unwrap().take();
+ }
+ }
+
+ let _reset = Reset;
+
+ let rt = Runtime(Arc::new(Inner {
+ released: task::TransferStack::new(),
+ core: TryLock::new(Core {
+ queue: VecDeque::new(),
+ tasks: LinkedList::new(),
+ }),
+ }));
+
+ *CURRENT.try_lock().unwrap() = Some(rt.clone());
+ f(rt)
+}
+
+#[derive(Clone)]
+struct Runtime(Arc<Inner>);
+
+struct Inner {
+ released: task::TransferStack<Runtime>,
+ core: TryLock<Core>,
+}
+
+struct Core {
+ queue: VecDeque<task::Notified<Runtime>>,
+ tasks: LinkedList<Task<Runtime>>,
+}
+
+static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
+
+impl Runtime {
+ fn tick(&self) -> usize {
+ self.tick_max(usize::max_value())
+ }
+
+ fn tick_max(&self, max: usize) -> usize {
+ let mut n = 0;
+
+ while !self.is_empty() && n < max {
+ let task = self.next_task();
+ n += 1;
+ task.run();
+ }
+
+ self.0.maintenance();
+
+ n
+ }
+
+ fn is_empty(&self) -> bool {
+ self.0.core.try_lock().unwrap().queue.is_empty()
+ }
+
+ fn next_task(&self) -> task::Notified<Runtime> {
+ self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
+ }
+
+ fn shutdown(&self) {
+ let mut core = self.0.core.try_lock().unwrap();
+
+ for task in core.tasks.iter() {
+ task.shutdown();
+ }
+
+ while let Some(task) = core.queue.pop_back() {
+ task.shutdown();
+ }
+
+ drop(core);
+
+ while !self.0.core.try_lock().unwrap().tasks.is_empty() {
+ self.0.maintenance();
+ }
+ }
+}
+
+impl Inner {
+ fn maintenance(&self) {
+ use std::mem::ManuallyDrop;
+
+ for task in self.released.drain() {
+ let task = ManuallyDrop::new(task);
+
+ // safety: see worker.rs
+ unsafe {
+ let ptr = task.header().into();
+ self.core.try_lock().unwrap().tasks.remove(ptr);
+ }
+ }
+ }
+}
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
+ rt.0.core.try_lock().unwrap().tasks.push_front(task);
+ rt
+ }
+
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ // safety: copying worker.rs
+ let task = unsafe { Task::from_raw(task.header().into()) };
+ self.0.released.push(task);
+ None
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ self.0.core.try_lock().unwrap().queue.push_back(task);
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs b/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs
new file mode 100644
index 0000000000..2bda0fc738
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs
@@ -0,0 +1,52 @@
+use crate::loom::sync::atomic::AtomicPtr;
+
+use std::ptr;
+use std::sync::atomic::Ordering::AcqRel;
+
+pub(super) struct AtomicCell<T> {
+ data: AtomicPtr<T>,
+}
+
+unsafe impl<T: Send> Send for AtomicCell<T> {}
+unsafe impl<T: Send> Sync for AtomicCell<T> {}
+
+impl<T> AtomicCell<T> {
+ pub(super) fn new(data: Option<Box<T>>) -> AtomicCell<T> {
+ AtomicCell {
+ data: AtomicPtr::new(to_raw(data)),
+ }
+ }
+
+ pub(super) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
+ let old = self.data.swap(to_raw(val), AcqRel);
+ from_raw(old)
+ }
+
+ #[cfg(feature = "blocking")]
+ pub(super) fn set(&self, val: Box<T>) {
+ let _ = self.swap(Some(val));
+ }
+
+ pub(super) fn take(&self) -> Option<Box<T>> {
+ self.swap(None)
+ }
+}
+
+fn to_raw<T>(data: Option<Box<T>>) -> *mut T {
+ data.map(Box::into_raw).unwrap_or(ptr::null_mut())
+}
+
+fn from_raw<T>(val: *mut T) -> Option<Box<T>> {
+ if val.is_null() {
+ None
+ } else {
+ Some(unsafe { Box::from_raw(val) })
+ }
+}
+
+impl<T> Drop for AtomicCell<T> {
+ fn drop(&mut self) {
+ // Free any data still held by the cell
+ let _ = self.take();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/idle.rs b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs
new file mode 100644
index 0000000000..ae87ca4ba1
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs
@@ -0,0 +1,222 @@
+//! Coordinates idling workers
+
+use crate::loom::sync::atomic::AtomicUsize;
+use crate::loom::sync::Mutex;
+
+use std::fmt;
+use std::sync::atomic::Ordering::{self, SeqCst};
+
+pub(super) struct Idle {
+ /// Tracks both the number of searching workers and the number of unparked
+ /// workers.
+ ///
+ /// Used as a fast-path to avoid acquiring the lock when needed.
+ state: AtomicUsize,
+
+ /// Sleeping workers
+ sleepers: Mutex<Vec<usize>>,
+
+ /// Total number of workers.
+ num_workers: usize,
+}
+
+const UNPARK_SHIFT: usize = 16;
+const UNPARK_MASK: usize = !SEARCH_MASK;
+const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
+
+#[derive(Copy, Clone)]
+struct State(usize);
+
+impl Idle {
+ pub(super) fn new(num_workers: usize) -> Idle {
+ let init = State::new(num_workers);
+
+ Idle {
+ state: AtomicUsize::new(init.into()),
+ sleepers: Mutex::new(Vec::with_capacity(num_workers)),
+ num_workers,
+ }
+ }
+
+ /// If there are no workers actively searching, returns the index of a
+ /// worker currently sleeping.
+ pub(super) fn worker_to_notify(&self) -> Option<usize> {
+ // If at least one worker is spinning, work being notified will
+ // eventully be found. A searching thread will find **some** work and
+ // notify another worker, eventually leading to our work being found.
+ //
+ // For this to happen, this load must happen before the thread
+ // transitioning `num_searching` to zero. Acquire / Relese does not
+ // provide sufficient guarantees, so this load is done with `SeqCst` and
+ // will pair with the `fetch_sub(1)` when transitioning out of
+ // searching.
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ // Check again, now that the lock is acquired
+ if !self.notify_should_wakeup() {
+ return None;
+ }
+
+ // A worker should be woken up, atomically increment the number of
+ // searching workers as well as the number of unparked workers.
+ State::unpark_one(&self.state);
+
+ // Get the worker to unpark
+ let ret = sleepers.pop();
+ debug_assert!(ret.is_some());
+
+ ret
+ }
+
+ /// Returns `true` if the worker needs to do a final check for submitted
+ /// work.
+ pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
+ // Acquire the lock
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ // Decrement the number of unparked threads
+ let ret = State::dec_num_unparked(&self.state, is_searching);
+
+ // Track the sleeping worker
+ sleepers.push(worker);
+
+ ret
+ }
+
+ pub(super) fn transition_worker_to_searching(&self) -> bool {
+ let state = State::load(&self.state, SeqCst);
+ if 2 * state.num_searching() >= self.num_workers {
+ return false;
+ }
+
+ // It is possible for this routine to allow more than 50% of the workers
+ // to search. That is OK. Limiting searchers is only an optimization to
+ // prevent too much contention.
+ State::inc_num_searching(&self.state, SeqCst);
+ true
+ }
+
+ /// A lightweight transition from searching -> running.
+ ///
+ /// Returns `true` if this is the final searching worker. The caller
+ /// **must** notify a new worker.
+ pub(super) fn transition_worker_from_searching(&self) -> bool {
+ State::dec_num_searching(&self.state)
+ }
+
+ /// Unpark a specific worker. This happens if tasks are submitted from
+ /// within the worker's park routine.
+ pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
+ let mut sleepers = self.sleepers.lock().unwrap();
+
+ for index in 0..sleepers.len() {
+ if sleepers[index] == worker_id {
+ sleepers.swap_remove(index);
+
+ // Update the state accordingly whle the lock is held.
+ State::unpark_one(&self.state);
+
+ return;
+ }
+ }
+ }
+
+ /// Returns `true` if `worker_id` is contained in the sleep set
+ pub(super) fn is_parked(&self, worker_id: usize) -> bool {
+ let sleepers = self.sleepers.lock().unwrap();
+ sleepers.contains(&worker_id)
+ }
+
+ fn notify_should_wakeup(&self) -> bool {
+ let state = State(self.state.fetch_add(0, SeqCst));
+ state.num_searching() == 0 && state.num_unparked() < self.num_workers
+ }
+}
+
+impl State {
+ fn new(num_workers: usize) -> State {
+ // All workers start in the unparked state
+ let ret = State(num_workers << UNPARK_SHIFT);
+ debug_assert_eq!(num_workers, ret.num_unparked());
+ debug_assert_eq!(0, ret.num_searching());
+ ret
+ }
+
+ fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
+ State(cell.load(ordering))
+ }
+
+ fn unpark_one(cell: &AtomicUsize) {
+ cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
+ }
+
+ fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
+ cell.fetch_add(1, ordering);
+ }
+
+ /// Returns `true` if this is the final searching worker
+ fn dec_num_searching(cell: &AtomicUsize) -> bool {
+ let state = State(cell.fetch_sub(1, SeqCst));
+ state.num_searching() == 1
+ }
+
+ /// Track a sleeping worker
+ ///
+ /// Returns `true` if this is the final searching worker.
+ fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
+ let mut dec = 1 << UNPARK_SHIFT;
+
+ if is_searching {
+ dec += 1;
+ }
+
+ let prev = State(cell.fetch_sub(dec, SeqCst));
+ is_searching && prev.num_searching() == 1
+ }
+
+ /// Number of workers currently searching
+ fn num_searching(self) -> usize {
+ self.0 & SEARCH_MASK
+ }
+
+ /// Number of workers currently unparked
+ fn num_unparked(self) -> usize {
+ (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
+ }
+}
+
+impl From<usize> for State {
+ fn from(src: usize) -> State {
+ State(src)
+ }
+}
+
+impl From<State> for usize {
+ fn from(src: State) -> usize {
+ src.0
+ }
+}
+
+impl fmt::Debug for State {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("worker::State")
+ .field("num_unparked", &self.num_unparked())
+ .field("num_searching", &self.num_searching())
+ .finish()
+ }
+}
+
+#[test]
+fn test_state() {
+ assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
+ assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
+
+ let state = State::new(10);
+ assert_eq!(10, state.num_unparked());
+ assert_eq!(0, state.num_searching());
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/mod.rs b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs
new file mode 100644
index 0000000000..82e82d5b30
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs
@@ -0,0 +1,117 @@
+//! Threadpool
+
+mod atomic_cell;
+use atomic_cell::AtomicCell;
+
+mod idle;
+use self::idle::Idle;
+
+mod worker;
+pub(crate) use worker::Launch;
+
+cfg_blocking! {
+ pub(crate) use worker::block_in_place;
+}
+
+use crate::loom::sync::Arc;
+use crate::runtime::task::{self, JoinHandle};
+use crate::runtime::Parker;
+
+use std::fmt;
+use std::future::Future;
+
+/// Work-stealing based thread pool for executing futures.
+pub(crate) struct ThreadPool {
+ spawner: Spawner,
+}
+
+/// Submit futures to the associated thread pool for execution.
+///
+/// A `Spawner` instance is a handle to a single thread pool that allows the owner
+/// of the handle to spawn futures onto the thread pool.
+///
+/// The `Spawner` handle is *only* used for spawning new futures. It does not
+/// impact the lifecycle of the thread pool in any way. The thread pool may
+/// shutdown while there are outstanding `Spawner` instances.
+///
+/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`].
+///
+/// [`ThreadPool::spawner`]: method@ThreadPool::spawner
+#[derive(Clone)]
+pub(crate) struct Spawner {
+ shared: Arc<worker::Shared>,
+}
+
+// ===== impl ThreadPool =====
+
+impl ThreadPool {
+ pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
+ let (shared, launch) = worker::create(size, parker);
+ let spawner = Spawner { shared };
+ let thread_pool = ThreadPool { spawner };
+
+ (thread_pool, launch)
+ }
+
+ /// Returns reference to `Spawner`.
+ ///
+ /// The `Spawner` handle can be cloned and enables spawning tasks from other
+ /// threads.
+ pub(crate) fn spawner(&self) -> &Spawner {
+ &self.spawner
+ }
+
+ /// Spawns a task
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.spawner.spawn(future)
+ }
+
+ /// Blocks the current thread waiting for the future to complete.
+ ///
+ /// The future will execute on the current thread, but all spawned tasks
+ /// will be executed on the thread pool.
+ pub(crate) fn block_on<F>(&self, future: F) -> F::Output
+ where
+ F: Future,
+ {
+ let mut enter = crate::runtime::enter();
+ enter.block_on(future).expect("failed to park thread")
+ }
+}
+
+impl fmt::Debug for ThreadPool {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("ThreadPool").finish()
+ }
+}
+
+impl Drop for ThreadPool {
+ fn drop(&mut self) {
+ self.spawner.shared.close();
+ }
+}
+
+// ==== impl Spawner =====
+
+impl Spawner {
+ /// Spawns a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ let (task, handle) = task::joinable(future);
+ self.shared.schedule(task, false);
+ handle
+ }
+}
+
+impl fmt::Debug for Spawner {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ fmt.debug_struct("Spawner").finish()
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/thread_pool/worker.rs b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs
new file mode 100644
index 0000000000..400e2a938c
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs
@@ -0,0 +1,761 @@
+//! A scheduler is initialized with a fixed number of workers. Each worker is
+//! driven by a thread. Each worker has a "core" which contains data such as the
+//! run queue and other state. When `block_in_place` is called, the worker's
+//! "core" is handed off to a new thread allowing the scheduler to continue to
+//! make progress while the originating thread blocks.
+
+use crate::loom::rand::seed;
+use crate::loom::sync::{Arc, Mutex};
+use crate::park::{Park, Unpark};
+use crate::runtime;
+use crate::runtime::park::{Parker, Unparker};
+use crate::runtime::thread_pool::{AtomicCell, Idle};
+use crate::runtime::{queue, task};
+use crate::util::linked_list::LinkedList;
+use crate::util::FastRand;
+
+use std::cell::RefCell;
+use std::time::Duration;
+
+/// A scheduler worker
+pub(super) struct Worker {
+ /// Reference to shared state
+ shared: Arc<Shared>,
+
+ /// Index holding this worker's remote state
+ index: usize,
+
+ /// Used to hand-off a worker's core to another thread.
+ core: AtomicCell<Core>,
+}
+
+/// Core data
+struct Core {
+ /// Used to schedule bookkeeping tasks every so often.
+ tick: u8,
+
+ /// When a task is scheduled from a worker, it is stored in this slot. The
+ /// worker will check this slot for a task **before** checking the run
+ /// queue. This effectively results in the **last** scheduled task to be run
+ /// next (LIFO). This is an optimization for message passing patterns and
+ /// helps to reduce latency.
+ lifo_slot: Option<Notified>,
+
+ /// The worker-local run queue.
+ run_queue: queue::Local<Arc<Worker>>,
+
+ /// True if the worker is currently searching for more work. Searching
+ /// involves attempting to steal from other workers.
+ is_searching: bool,
+
+ /// True if the scheduler is being shutdown
+ is_shutdown: bool,
+
+ /// Tasks owned by the core
+ tasks: LinkedList<Task>,
+
+ /// Parker
+ ///
+ /// Stored in an `Option` as the parker is added / removed to make the
+ /// borrow checker happy.
+ park: Option<Parker>,
+
+ /// Fast random number generator.
+ rand: FastRand,
+}
+
+/// State shared across all workers
+pub(super) struct Shared {
+ /// Per-worker remote state. All other workers have access to this and is
+ /// how they communicate between each other.
+ remotes: Box<[Remote]>,
+
+ /// Submit work to the scheduler while **not** currently on a worker thread.
+ inject: queue::Inject<Arc<Worker>>,
+
+ /// Coordinates idle workers
+ idle: Idle,
+
+ /// Workers have have observed the shutdown signal
+ ///
+ /// The core is **not** placed back in the worker to avoid it from being
+ /// stolen by a thread that was spawned as part of `block_in_place`.
+ shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
+}
+
+/// Used to communicate with a worker from other threads.
+struct Remote {
+ /// Steal tasks from this worker.
+ steal: queue::Steal<Arc<Worker>>,
+
+ /// Transfers tasks to be released. Any worker pushes tasks, only the owning
+ /// worker pops.
+ pending_drop: task::TransferStack<Arc<Worker>>,
+
+ /// Unparks the associated worker thread
+ unpark: Unparker,
+}
+
+/// Thread-local context
+struct Context {
+ /// Worker
+ worker: Arc<Worker>,
+
+ /// Core data
+ core: RefCell<Option<Box<Core>>>,
+}
+
+/// Starts the workers
+pub(crate) struct Launch(Vec<Arc<Worker>>);
+
+/// Running a task may consume the core. If the core is still available when
+/// running the task completes, it is returned. Otherwise, the worker will need
+/// to stop processing.
+type RunResult = Result<Box<Core>, ()>;
+
+/// A task handle
+type Task = task::Task<Arc<Worker>>;
+
+/// A notified task handle
+type Notified = task::Notified<Arc<Worker>>;
+
+// Tracks thread-local state
+scoped_thread_local!(static CURRENT: Context);
+
+pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
+ let mut cores = vec![];
+ let mut remotes = vec![];
+
+ // Create the local queues
+ for _ in 0..size {
+ let (steal, run_queue) = queue::local();
+
+ let park = park.clone();
+ let unpark = park.unpark();
+
+ cores.push(Box::new(Core {
+ tick: 0,
+ lifo_slot: None,
+ run_queue,
+ is_searching: false,
+ is_shutdown: false,
+ tasks: LinkedList::new(),
+ park: Some(park),
+ rand: FastRand::new(seed()),
+ }));
+
+ remotes.push(Remote {
+ steal,
+ pending_drop: task::TransferStack::new(),
+ unpark,
+ });
+ }
+
+ let shared = Arc::new(Shared {
+ remotes: remotes.into_boxed_slice(),
+ inject: queue::Inject::new(),
+ idle: Idle::new(size),
+ shutdown_workers: Mutex::new(vec![]),
+ });
+
+ let mut launch = Launch(vec![]);
+
+ for (index, core) in cores.drain(..).enumerate() {
+ launch.0.push(Arc::new(Worker {
+ shared: shared.clone(),
+ index,
+ core: AtomicCell::new(Some(core)),
+ }));
+ }
+
+ (shared, launch)
+}
+
+cfg_blocking! {
+ pub(crate) fn block_in_place<F, R>(f: F) -> R
+ where
+ F: FnOnce() -> R,
+ {
+ // Try to steal the worker core back
+ struct Reset;
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ let core = cx.worker.core.take();
+ *cx.core.borrow_mut() = core;
+ }
+ });
+ }
+ }
+
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
+
+ // Get the worker core. If none is set, then blocking is fine!
+ let core = match cx.core.borrow_mut().take() {
+ Some(core) => {
+ // We are effectively leaving the executor, so we need to
+ // forcibly end budgeting.
+ crate::coop::stop();
+ core
+ },
+ None => return,
+ };
+
+ // The parker should be set here
+ assert!(core.park.is_some());
+
+ // In order to block, the core must be sent to another thread for
+ // execution.
+ //
+ // First, move the core back into the worker's shared core slot.
+ cx.worker.core.set(core);
+
+ // Next, clone the worker handle and send it to a new thread for
+ // processing.
+ //
+ // Once the blocking task is done executing, we will attempt to
+ // steal the core back.
+ let worker = cx.worker.clone();
+ runtime::spawn_blocking(move || run(worker));
+ });
+
+ let _reset = Reset;
+
+ f()
+ }
+}
+
+/// After how many ticks is the global queue polled. This helps to ensure
+/// fairness.
+///
+/// The number is fairly arbitrary. I believe this value was copied from golang.
+const GLOBAL_POLL_INTERVAL: u8 = 61;
+
+impl Launch {
+ pub(crate) fn launch(mut self) {
+ for worker in self.0.drain(..) {
+ runtime::spawn_blocking(move || run(worker));
+ }
+ }
+}
+
+fn run(worker: Arc<Worker>) {
+ // Acquire a core. If this fails, then another thread is running this
+ // worker and there is nothing further to do.
+ let core = match worker.core.take() {
+ Some(core) => core,
+ None => return,
+ };
+
+ // Set the worker context.
+ let cx = Context {
+ worker,
+ core: RefCell::new(None),
+ };
+
+ let _enter = crate::runtime::enter();
+
+ CURRENT.set(&cx, || {
+ // This should always be an error. It only returns a `Result` to support
+ // using `?` to short circuit.
+ assert!(cx.run(core).is_err());
+ });
+}
+
+impl Context {
+ fn run(&self, mut core: Box<Core>) -> RunResult {
+ while !core.is_shutdown {
+ // Increment the tick
+ core.tick();
+
+ // Run maintenance, if needed
+ core = self.maintenance(core);
+
+ // First, check work available to the current worker.
+ if let Some(task) = core.next_task(&self.worker) {
+ core = self.run_task(task, core)?;
+ continue;
+ }
+
+ // There is no more **local** work to process, try to steal work
+ // from other workers.
+ if let Some(task) = core.steal_work(&self.worker) {
+ core = self.run_task(task, core)?;
+ } else {
+ // Wait for work
+ core = self.park(core);
+ }
+ }
+
+ // Signal shutdown
+ self.worker.shared.shutdown(core, self.worker.clone());
+ Err(())
+ }
+
+ fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
+ // Make sure thew orker is not in the **searching** state. This enables
+ // another idle worker to try to steal work.
+ core.transition_from_searching(&self.worker);
+
+ // Make the core available to the runtime context
+ *self.core.borrow_mut() = Some(core);
+
+ // Run the task
+ crate::coop::budget(|| {
+ task.run();
+
+ // As long as there is budget remaining and a task exists in the
+ // `lifo_slot`, then keep running.
+ loop {
+ // Check if we still have the core. If not, the core was stolen
+ // by another worker.
+ let mut core = match self.core.borrow_mut().take() {
+ Some(core) => core,
+ None => return Err(()),
+ };
+
+ // Check for a task in the LIFO slot
+ let task = match core.lifo_slot.take() {
+ Some(task) => task,
+ None => return Ok(core),
+ };
+
+ if crate::coop::has_budget_remaining() {
+ // Run the LIFO task, then loop
+ *self.core.borrow_mut() = Some(core);
+ task.run();
+ } else {
+ // Not enough budget left to run the LIFO task, push it to
+ // the back of the queue and return.
+ core.run_queue.push_back(task, self.worker.inject());
+ return Ok(core);
+ }
+ }
+ })
+ }
+
+ fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
+ if core.tick % GLOBAL_POLL_INTERVAL == 0 {
+ // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
+ // to run without actually putting the thread to sleep.
+ core = self.park_timeout(core, Some(Duration::from_millis(0)));
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+ }
+
+ core
+ }
+
+ fn park(&self, mut core: Box<Core>) -> Box<Core> {
+ core.transition_to_parked(&self.worker);
+
+ while !core.is_shutdown {
+ core = self.park_timeout(core, None);
+
+ // Run regularly scheduled maintenance
+ core.maintenance(&self.worker);
+
+ if core.transition_from_parked(&self.worker) {
+ return core;
+ }
+ }
+
+ core
+ }
+
+ fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
+ // Take the parker out of core
+ let mut park = core.park.take().expect("park missing");
+
+ // Store `core` in context
+ *self.core.borrow_mut() = Some(core);
+
+ // Park thread
+ if let Some(timeout) = duration {
+ park.park_timeout(timeout).expect("park failed");
+ } else {
+ park.park().expect("park failed");
+ }
+
+ // Remove `core` from context
+ core = self.core.borrow_mut().take().expect("core missing");
+
+ // Place `park` back in `core`
+ core.park = Some(park);
+
+ // If there are tasks available to steal, notify a worker
+ if core.run_queue.is_stealable() {
+ self.worker.shared.notify_parked();
+ }
+
+ core
+ }
+}
+
+impl Core {
+ /// Increment the tick
+ fn tick(&mut self) {
+ self.tick = self.tick.wrapping_add(1);
+ }
+
+ /// Return the next notified task available to this worker.
+ fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
+ if self.tick % GLOBAL_POLL_INTERVAL == 0 {
+ worker.inject().pop().or_else(|| self.next_local_task())
+ } else {
+ self.next_local_task().or_else(|| worker.inject().pop())
+ }
+ }
+
+ fn next_local_task(&mut self) -> Option<Notified> {
+ self.lifo_slot.take().or_else(|| self.run_queue.pop())
+ }
+
+ fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
+ if !self.transition_to_searching(worker) {
+ return None;
+ }
+
+ let num = worker.shared.remotes.len();
+ // Start from a random worker
+ let start = self.rand.fastrand_n(num as u32) as usize;
+
+ for i in 0..num {
+ let i = (start + i) % num;
+
+ // Don't steal from ourself! We know we don't have work.
+ if i == worker.index {
+ continue;
+ }
+
+ let target = &worker.shared.remotes[i];
+ if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
+ return Some(task);
+ }
+ }
+
+ // Fallback on checking the global queue
+ worker.shared.inject.pop()
+ }
+
+ fn transition_to_searching(&mut self, worker: &Worker) -> bool {
+ if !self.is_searching {
+ self.is_searching = worker.shared.idle.transition_worker_to_searching();
+ }
+
+ self.is_searching
+ }
+
+ fn transition_from_searching(&mut self, worker: &Worker) {
+ if !self.is_searching {
+ return;
+ }
+
+ self.is_searching = false;
+ worker.shared.transition_worker_from_searching();
+ }
+
+ /// Prepare the worker state for parking
+ fn transition_to_parked(&mut self, worker: &Worker) {
+ // When the final worker transitions **out** of searching to parked, it
+ // must check all the queues one last time in case work materialized
+ // between the last work scan and transitioning out of searching.
+ let is_last_searcher = worker
+ .shared
+ .idle
+ .transition_worker_to_parked(worker.index, self.is_searching);
+
+ // The worker is no longer searching. Setting this is the local cache
+ // only.
+ self.is_searching = false;
+
+ if is_last_searcher {
+ worker.shared.notify_if_work_pending();
+ }
+ }
+
+ /// Returns `true` if the transition happened.
+ fn transition_from_parked(&mut self, worker: &Worker) -> bool {
+ // If a task is in the lifo slot, then we must unpark regardless of
+ // being notified
+ if self.lifo_slot.is_some() {
+ worker.shared.idle.unpark_worker_by_id(worker.index);
+ self.is_searching = true;
+ return true;
+ }
+
+ if worker.shared.idle.is_parked(worker.index) {
+ return false;
+ }
+
+ // When unparked, the worker is in the searching state.
+ self.is_searching = true;
+ true
+ }
+
+ /// Runs maintenance work such as free pending tasks and check the pool's
+ /// state.
+ fn maintenance(&mut self, worker: &Worker) {
+ self.drain_pending_drop(worker);
+
+ if !self.is_shutdown {
+ // Check if the scheduler has been shutdown
+ self.is_shutdown = worker.inject().is_closed();
+ }
+ }
+
+ // Shutdown the core
+ fn shutdown(&mut self, worker: &Worker) {
+ // Take the core
+ let mut park = self.park.take().expect("park missing");
+
+ // Signal to all tasks to shut down.
+ for header in self.tasks.iter() {
+ header.shutdown();
+ }
+
+ loop {
+ self.drain_pending_drop(worker);
+
+ if self.tasks.is_empty() {
+ break;
+ }
+
+ // Wait until signalled
+ park.park().expect("park failed");
+ }
+
+ // Drain the queue
+ while let Some(_) = self.next_local_task() {}
+ }
+
+ fn drain_pending_drop(&mut self, worker: &Worker) {
+ use std::mem::ManuallyDrop;
+
+ for task in worker.remote().pending_drop.drain() {
+ let task = ManuallyDrop::new(task);
+
+ // safety: tasks are only pushed into the `pending_drop` stacks that
+ // are associated with the list they are inserted into. When a task
+ // is pushed into `pending_drop`, the ref-inc is skipped, so we must
+ // not ref-dec here.
+ //
+ // See `bind` and `release` implementations.
+ unsafe {
+ self.tasks.remove(task.header().into());
+ }
+ }
+ }
+}
+
+impl Worker {
+ /// Returns a reference to the scheduler's injection queue
+ fn inject(&self) -> &queue::Inject<Arc<Worker>> {
+ &self.shared.inject
+ }
+
+ /// Return a reference to this worker's remote data
+ fn remote(&self) -> &Remote {
+ &self.shared.remotes[self.index]
+ }
+
+ fn eq(&self, other: &Worker) -> bool {
+ self.shared.ptr_eq(&other.shared) && self.index == other.index
+ }
+}
+
+impl task::Schedule for Arc<Worker> {
+ fn bind(task: Task) -> Arc<Worker> {
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+
+ // Track the task
+ cx.core
+ .borrow_mut()
+ .as_mut()
+ .expect("scheduler core missing")
+ .tasks
+ .push_front(task);
+
+ // Return a clone of the worker
+ cx.worker.clone()
+ })
+ }
+
+ fn release(&self, task: &Task) -> Option<Task> {
+ use std::ptr::NonNull;
+
+ CURRENT.with(|maybe_cx| {
+ let cx = maybe_cx.expect("scheduler context missing");
+
+ if self.eq(&cx.worker) {
+ let mut maybe_core = cx.core.borrow_mut();
+
+ if let Some(core) = &mut *maybe_core {
+ // Directly remove the task
+ //
+ // safety: the task is inserted in the list in `bind`.
+ unsafe {
+ let ptr = NonNull::from(task.header());
+ return core.tasks.remove(ptr);
+ }
+ }
+ }
+
+ // Track the task to be released by the worker that owns it
+ //
+ // Safety: We get a new handle without incrementing the ref-count.
+ // A ref-count is held by the "owned" linked list and it is only
+ // ever removed from that list as part of the release process: this
+ // method or popping the task from `pending_drop`. Thus, we can rely
+ // on the ref-count held by the linked-list to keep the memory
+ // alive.
+ //
+ // When the task is removed from the stack, it is forgotten instead
+ // of dropped.
+ let task = unsafe { Task::from_raw(task.header().into()) };
+
+ self.remote().pending_drop.push(task);
+
+ if cx.core.borrow().is_some() {
+ return None;
+ }
+
+ // The worker core has been handed off to another thread. In the
+ // event that the scheduler is currently shutting down, the thread
+ // that owns the task may be waiting on the release to complete
+ // shutdown.
+ if self.inject().is_closed() {
+ self.remote().unpark.unpark();
+ }
+
+ None
+ })
+ }
+
+ fn schedule(&self, task: Notified) {
+ self.shared.schedule(task, false);
+ }
+
+ fn yield_now(&self, task: Notified) {
+ self.shared.schedule(task, true);
+ }
+}
+
+impl Shared {
+ pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
+ CURRENT.with(|maybe_cx| {
+ if let Some(cx) = maybe_cx {
+ // Make sure the task is part of the **current** scheduler.
+ if self.ptr_eq(&cx.worker.shared) {
+ // And the current thread still holds a core
+ if let Some(core) = cx.core.borrow_mut().as_mut() {
+ self.schedule_local(core, task, is_yield);
+ return;
+ }
+ }
+ }
+
+ // Otherwise, use the inject queue
+ self.inject.push(task);
+ self.notify_parked();
+ });
+ }
+
+ fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
+ // Spawning from the worker thread. If scheduling a "yield" then the
+ // task must always be pushed to the back of the queue, enabling other
+ // tasks to be executed. If **not** a yield, then there is more
+ // flexibility and the task may go to the front of the queue.
+ let should_notify = if is_yield {
+ core.run_queue.push_back(task, &self.inject);
+ true
+ } else {
+ // Push to the LIFO slot
+ let prev = core.lifo_slot.take();
+ let ret = prev.is_some();
+
+ if let Some(prev) = prev {
+ core.run_queue.push_back(prev, &self.inject);
+ }
+
+ core.lifo_slot = Some(task);
+
+ ret
+ };
+
+ // Only notify if not currently parked. If `park` is `None`, then the
+ // scheduling is from a resource driver. As notifications often come in
+ // batches, the notification is delayed until the park is complete.
+ if should_notify && core.park.is_some() {
+ self.notify_parked();
+ }
+ }
+
+ pub(super) fn close(&self) {
+ if self.inject.close() {
+ self.notify_all();
+ }
+ }
+
+ fn notify_parked(&self) {
+ if let Some(index) = self.idle.worker_to_notify() {
+ self.remotes[index].unpark.unpark();
+ }
+ }
+
+ fn notify_all(&self) {
+ for remote in &self.remotes[..] {
+ remote.unpark.unpark();
+ }
+ }
+
+ fn notify_if_work_pending(&self) {
+ for remote in &self.remotes[..] {
+ if !remote.steal.is_empty() {
+ self.notify_parked();
+ return;
+ }
+ }
+
+ if !self.inject.is_empty() {
+ self.notify_parked();
+ }
+ }
+
+ fn transition_worker_from_searching(&self) {
+ if self.idle.transition_worker_from_searching() {
+ // We are the final searching worker. Because work was found, we
+ // need to notify another worker.
+ self.notify_parked();
+ }
+ }
+
+ /// Signals that a worker has observed the shutdown signal and has replaced
+ /// its core back into its handle.
+ ///
+ /// If all workers have reached this point, the final cleanup is performed.
+ fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
+ let mut workers = self.shutdown_workers.lock().unwrap();
+ workers.push((core, worker));
+
+ if workers.len() != self.remotes.len() {
+ return;
+ }
+
+ for (mut core, worker) in workers.drain(..) {
+ core.shutdown(&worker);
+ }
+
+ // Drain the injection queue
+ while let Some(_) = self.inject.pop() {}
+ }
+
+ fn ptr_eq(&self, other: &Shared) -> bool {
+ self as *const _ == other as *const _
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/time.rs b/third_party/rust/tokio/src/runtime/time.rs
new file mode 100644
index 0000000000..c623d9641a
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/time.rs
@@ -0,0 +1,59 @@
+//! Abstracts out the APIs necessary to `Runtime` for integrating the time
+//! driver. When the `time` feature flag is **not** enabled. These APIs are
+//! shells. This isolates the complexity of dealing with conditional
+//! compilation.
+
+pub(crate) use variant::*;
+
+#[cfg(feature = "time")]
+mod variant {
+ use crate::park::Either;
+ use crate::runtime::io;
+ use crate::time::{self, driver};
+
+ pub(crate) type Clock = time::Clock;
+ pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>;
+ pub(crate) type Handle = Option<driver::Handle>;
+
+ pub(crate) fn create_clock() -> Clock {
+ Clock::new()
+ }
+
+ /// Create a new timer driver / handle pair
+ pub(crate) fn create_driver(
+ enable: bool,
+ io_driver: io::Driver,
+ clock: Clock,
+ ) -> (Driver, Handle) {
+ if enable {
+ let driver = driver::Driver::new(io_driver, clock);
+ let handle = driver.handle();
+
+ (Either::A(driver), Some(handle))
+ } else {
+ (Either::B(io_driver), None)
+ }
+ }
+}
+
+#[cfg(not(feature = "time"))]
+mod variant {
+ use crate::runtime::io;
+
+ pub(crate) type Clock = ();
+ pub(crate) type Driver = io::Driver;
+ pub(crate) type Handle = ();
+
+ pub(crate) fn create_clock() -> Clock {
+ ()
+ }
+
+ /// Create a new timer driver / handle pair
+ pub(crate) fn create_driver(
+ _enable: bool,
+ io_driver: io::Driver,
+ _clock: Clock,
+ ) -> (Driver, Handle) {
+ (io_driver, ())
+ }
+}