diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 14:29:10 +0000 |
commit | 2aa4a82499d4becd2284cdb482213d541b8804dd (patch) | |
tree | b80bf8bf13c3766139fbacc530efd0dd9d54394c /third_party/rust/tokio/src/runtime | |
parent | Initial commit. (diff) | |
download | firefox-upstream.tar.xz firefox-upstream.zip |
Adding upstream version 86.0.1.upstream/86.0.1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/runtime')
37 files changed, 7431 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/basic_scheduler.rs b/third_party/rust/tokio/src/runtime/basic_scheduler.rs new file mode 100644 index 0000000000..301554280f --- /dev/null +++ b/third_party/rust/tokio/src/runtime/basic_scheduler.rs @@ -0,0 +1,326 @@ +use crate::park::{Park, Unpark}; +use crate::runtime; +use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::{waker_ref, Wake}; + +use std::cell::RefCell; +use std::collections::VecDeque; +use std::fmt; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use std::task::Poll::Ready; +use std::time::Duration; + +/// Executes tasks on the current thread +pub(crate) struct BasicScheduler<P> +where + P: Park, +{ + /// Scheduler run queue + /// + /// When the scheduler is executed, the queue is removed from `self` and + /// moved into `Context`. + /// + /// This indirection is to allow `BasicScheduler` to be `Send`. + tasks: Option<Tasks>, + + /// Sendable task spawner + spawner: Spawner, + + /// Current tick + tick: u8, + + /// Thread park handle + park: P, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + shared: Arc<Shared>, +} + +struct Tasks { + /// Collection of all active tasks spawned onto this executor. + owned: LinkedList<Task<Arc<Shared>>>, + + /// Local run queue. + /// + /// Tasks notified from the current thread are pushed into this queue. + queue: VecDeque<task::Notified<Arc<Shared>>>, +} + +/// Scheduler state shared between threads. +struct Shared { + /// Remote run queue + queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, + + /// Unpark the blocked thread + unpark: Box<dyn Unpark>, +} + +/// Thread-local context +struct Context { + /// Shared scheduler state + shared: Arc<Shared>, + + /// Local queue + tasks: RefCell<Tasks>, +} + +/// Initial queue capacity +const INITIAL_CAPACITY: usize = 64; + +/// Max number of tasks to poll per tick. +const MAX_TASKS_PER_TICK: usize = 61; + +/// How often ot check the remote queue first +const REMOTE_FIRST_INTERVAL: u8 = 31; + +// Tracks the current BasicScheduler +scoped_thread_local!(static CURRENT: Context); + +impl<P> BasicScheduler<P> +where + P: Park, +{ + pub(crate) fn new(park: P) -> BasicScheduler<P> { + let unpark = Box::new(park.unpark()); + + BasicScheduler { + tasks: Some(Tasks { + owned: LinkedList::new(), + queue: VecDeque::with_capacity(INITIAL_CAPACITY), + }), + spawner: Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box<dyn Unpark>, + }), + }, + tick: 0, + park, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + + pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output + where + F: Future, + { + enter(self, |scheduler, context| { + let _enter = runtime::enter(); + let waker = waker_ref(&scheduler.spawner.shared); + let mut cx = std::task::Context::from_waker(&waker); + + pin!(future); + + 'outer: loop { + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + return v; + } + + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = scheduler.tick; + scheduler.tick = scheduler.tick.wrapping_add(1); + + let next = if tick % REMOTE_FIRST_INTERVAL == 0 { + scheduler + .spawner + .pop() + .or_else(|| context.tasks.borrow_mut().queue.pop_front()) + } else { + context + .tasks + .borrow_mut() + .queue + .pop_front() + .or_else(|| scheduler.spawner.pop()) + }; + + match next { + Some(task) => crate::coop::budget(|| task.run()), + None => { + // Park until the thread is signaled + scheduler.park.park().ok().expect("failed to park"); + + // Try polling the `block_on` future next + continue 'outer; + } + } + } + + // Yield to the park, this drives the timer and pulls any pending + // I/O events. + scheduler + .park + .park_timeout(Duration::from_millis(0)) + .ok() + .expect("failed to park"); + } + }) + } +} + +/// Enter the scheduler context. This sets the queue and other necessary +/// scheduler state in the thread-local +fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R +where + F: FnOnce(&mut BasicScheduler<P>, &Context) -> R, + P: Park, +{ + // Ensures the run queue is placed back in the `BasicScheduler` instance + // once `block_on` returns.` + struct Guard<'a, P: Park> { + context: Option<Context>, + scheduler: &'a mut BasicScheduler<P>, + } + + impl<P: Park> Drop for Guard<'_, P> { + fn drop(&mut self) { + let Context { tasks, .. } = self.context.take().expect("context missing"); + self.scheduler.tasks = Some(tasks.into_inner()); + } + } + + // Remove `tasks` from `self` and place it in a `Context`. + let tasks = scheduler.tasks.take().expect("invalid state"); + + let guard = Guard { + context: Some(Context { + shared: scheduler.spawner.shared.clone(), + tasks: RefCell::new(tasks), + }), + scheduler, + }; + + let context = guard.context.as_ref().unwrap(); + let scheduler = &mut *guard.scheduler; + + CURRENT.set(context, || f(scheduler, context)) +} + +impl<P> Drop for BasicScheduler<P> +where + P: Park, +{ + fn drop(&mut self) { + enter(self, |scheduler, context| { + // Loop required here to ensure borrow is dropped between iterations + #[allow(clippy::while_let_loop)] + loop { + let task = match context.tasks.borrow_mut().owned.pop_back() { + Some(task) => task, + None => break, + }; + + task.shutdown(); + } + + // Drain local queue + for task in context.tasks.borrow_mut().queue.drain(..) { + task.shutdown(); + } + + // Drain remote queue + for task in scheduler.spawner.shared.queue.lock().unwrap().drain(..) { + task.shutdown(); + } + + assert!(context.tasks.borrow().owned.is_empty()); + }); + } +} + +impl<P: Park> fmt::Debug for BasicScheduler<P> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() + } +} + +// ===== impl Spawner ===== + +impl Spawner { + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.shared.schedule(task); + handle + } + + fn pop(&self) -> Option<task::Notified<Arc<Shared>>> { + self.shared.queue.lock().unwrap().pop_front() + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() + } +} + +// ===== impl Shared ===== + +impl Schedule for Arc<Shared> { + fn bind(task: Task<Self>) -> Arc<Shared> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.clone() + }) + } + + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + use std::ptr::NonNull; + + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + cx.tasks.borrow_mut().owned.remove(ptr) + } + }) + } + + fn schedule(&self, task: task::Notified<Self>) { + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.tasks.borrow_mut().queue.push_back(task); + } + _ => { + self.queue.lock().unwrap().push_back(task); + self.unpark.unpark(); + } + }); + } +} + +impl Wake for Shared { + fn wake(self: Arc<Self>) { + Wake::wake_by_ref(&self) + } + + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.unpark.unpark(); + } +} diff --git a/third_party/rust/tokio/src/runtime/blocking/mod.rs b/third_party/rust/tokio/src/runtime/blocking/mod.rs new file mode 100644 index 0000000000..5c808335cc --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/mod.rs @@ -0,0 +1,43 @@ +//! Abstracts out the APIs necessary to `Runtime` for integrating the blocking +//! pool. When the `blocking` feature flag is **not** enabled, these APIs are +//! shells. This isolates the complexity of dealing with conditional +//! compilation. + +cfg_blocking_impl! { + mod pool; + pub(crate) use pool::{spawn_blocking, try_spawn_blocking, BlockingPool, Spawner}; + + mod schedule; + mod shutdown; + mod task; + + use crate::runtime::Builder; + + pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { + BlockingPool::new(builder, thread_cap) + + } +} + +cfg_not_blocking_impl! { + use crate::runtime::Builder; + use std::time::Duration; + + #[derive(Debug, Clone)] + pub(crate) struct BlockingPool {} + + pub(crate) use BlockingPool as Spawner; + + pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { + BlockingPool {} + } + + impl BlockingPool { + pub(crate) fn spawner(&self) -> &BlockingPool { + self + } + + pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) { + } + } +} diff --git a/third_party/rust/tokio/src/runtime/blocking/pool.rs b/third_party/rust/tokio/src/runtime/blocking/pool.rs new file mode 100644 index 0000000000..a3b208d171 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/pool.rs @@ -0,0 +1,307 @@ +//! Thread pool for blocking operations + +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::thread; +use crate::runtime::blocking::schedule::NoopSchedule; +use crate::runtime::blocking::shutdown; +use crate::runtime::blocking::task::BlockingTask; +use crate::runtime::task::{self, JoinHandle}; +use crate::runtime::{Builder, Callback, Handle}; + +use std::collections::VecDeque; +use std::fmt; +use std::time::Duration; + +pub(crate) struct BlockingPool { + spawner: Spawner, + shutdown_rx: shutdown::Receiver, +} + +#[derive(Clone)] +pub(crate) struct Spawner { + inner: Arc<Inner>, +} + +struct Inner { + /// State shared between worker threads + shared: Mutex<Shared>, + + /// Pool threads wait on this. + condvar: Condvar, + + /// Spawned threads use this name + thread_name: String, + + /// Spawned thread stack size + stack_size: Option<usize>, + + /// Call after a thread starts + after_start: Option<Callback>, + + /// Call before a thread stops + before_stop: Option<Callback>, + + thread_cap: usize, +} + +struct Shared { + queue: VecDeque<Task>, + num_th: usize, + num_idle: u32, + num_notify: u32, + shutdown: bool, + shutdown_tx: Option<shutdown::Sender>, +} + +type Task = task::Notified<NoopSchedule>; + +const KEEP_ALIVE: Duration = Duration::from_secs(10); + +/// Run the provided function on an executor dedicated to blocking operations. +pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R> +where + F: FnOnce() -> R + Send + 'static, +{ + let rt = Handle::current(); + + let (task, handle) = task::joinable(BlockingTask::new(func)); + let _ = rt.blocking_spawner.spawn(task, &rt); + handle +} + +#[allow(dead_code)] +pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()> +where + F: FnOnce() -> R + Send + 'static, +{ + let rt = Handle::current(); + + let (task, _handle) = task::joinable(BlockingTask::new(func)); + rt.blocking_spawner.spawn(task, &rt) +} + +// ===== impl BlockingPool ===== + +impl BlockingPool { + pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool { + let (shutdown_tx, shutdown_rx) = shutdown::channel(); + + BlockingPool { + spawner: Spawner { + inner: Arc::new(Inner { + shared: Mutex::new(Shared { + queue: VecDeque::new(), + num_th: 0, + num_idle: 0, + num_notify: 0, + shutdown: false, + shutdown_tx: Some(shutdown_tx), + }), + condvar: Condvar::new(), + thread_name: builder.thread_name.clone(), + stack_size: builder.thread_stack_size, + after_start: builder.after_start.clone(), + before_stop: builder.before_stop.clone(), + thread_cap, + }), + }, + shutdown_rx, + } + } + + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) { + let mut shared = self.spawner.inner.shared.lock().unwrap(); + + // The function can be called multiple times. First, by explicitly + // calling `shutdown` then by the drop handler calling `shutdown`. This + // prevents shutting down twice. + if shared.shutdown { + return; + } + + shared.shutdown = true; + shared.shutdown_tx = None; + self.spawner.inner.condvar.notify_all(); + + drop(shared); + + self.shutdown_rx.wait(timeout); + } +} + +impl Drop for BlockingPool { + fn drop(&mut self) { + self.shutdown(None); + } +} + +impl fmt::Debug for BlockingPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BlockingPool").finish() + } +} + +// ===== impl Spawner ===== + +impl Spawner { + fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { + let shutdown_tx = { + let mut shared = self.inner.shared.lock().unwrap(); + + if shared.shutdown { + // Shutdown the task + task.shutdown(); + + // no need to even push this task; it would never get picked up + return Err(()); + } + + shared.queue.push_back(task); + + if shared.num_idle == 0 { + // No threads are able to process the task. + + if shared.num_th == self.inner.thread_cap { + // At max number of threads + None + } else { + shared.num_th += 1; + assert!(shared.shutdown_tx.is_some()); + shared.shutdown_tx.clone() + } + } else { + // Notify an idle worker thread. The notification counter + // is used to count the needed amount of notifications + // exactly. Thread libraries may generate spurious + // wakeups, this counter is used to keep us in a + // consistent state. + shared.num_idle -= 1; + shared.num_notify += 1; + self.inner.condvar.notify_one(); + None + } + }; + + if let Some(shutdown_tx) = shutdown_tx { + self.spawn_thread(shutdown_tx, rt); + } + + Ok(()) + } + + fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) { + let mut builder = thread::Builder::new().name(self.inner.thread_name.clone()); + + if let Some(stack_size) = self.inner.stack_size { + builder = builder.stack_size(stack_size); + } + + let rt = rt.clone(); + + builder + .spawn(move || { + // Only the reference should be moved into the closure + let rt = &rt; + rt.enter(move || { + rt.blocking_spawner.inner.run(); + drop(shutdown_tx); + }) + }) + .unwrap(); + } +} + +impl Inner { + fn run(&self) { + if let Some(f) = &self.after_start { + f() + } + + let mut shared = self.shared.lock().unwrap(); + + 'main: loop { + // BUSY + while let Some(task) = shared.queue.pop_front() { + drop(shared); + task.run(); + + shared = self.shared.lock().unwrap(); + } + + // IDLE + shared.num_idle += 1; + + while !shared.shutdown { + let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap(); + + shared = lock_result.0; + let timeout_result = lock_result.1; + + if shared.num_notify != 0 { + // We have received a legitimate wakeup, + // acknowledge it by decrementing the counter + // and transition to the BUSY state. + shared.num_notify -= 1; + break; + } + + // Even if the condvar "timed out", if the pool is entering the + // shutdown phase, we want to perform the cleanup logic. + if !shared.shutdown && timeout_result.timed_out() { + break 'main; + } + + // Spurious wakeup detected, go back to sleep. + } + + if shared.shutdown { + // Drain the queue + while let Some(task) = shared.queue.pop_front() { + drop(shared); + task.shutdown(); + + shared = self.shared.lock().unwrap(); + } + + // Work was produced, and we "took" it (by decrementing num_notify). + // This means that num_idle was decremented once for our wakeup. + // But, since we are exiting, we need to "undo" that, as we'll stay idle. + shared.num_idle += 1; + // NOTE: Technically we should also do num_notify++ and notify again, + // but since we're shutting down anyway, that won't be necessary. + break; + } + } + + // Thread exit + shared.num_th -= 1; + + // num_idle should now be tracked exactly, panic + // with a descriptive message if it is not the + // case. + shared.num_idle = shared + .num_idle + .checked_sub(1) + .expect("num_idle underflowed on thread exit"); + + if shared.shutdown && shared.num_th == 0 { + self.condvar.notify_one(); + } + + drop(shared); + + if let Some(f) = &self.before_stop { + f() + } + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("blocking::Spawner").finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/blocking/schedule.rs b/third_party/rust/tokio/src/runtime/blocking/schedule.rs new file mode 100644 index 0000000000..e10778d530 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/schedule.rs @@ -0,0 +1,24 @@ +use crate::runtime::task::{self, Task}; + +/// `task::Schedule` implementation that does nothing. This is unique to the +/// blocking scheduler as tasks scheduled are not really futures but blocking +/// operations. +/// +/// We avoid storing the task by forgetting it in `bind` and re-materializing it +/// in `release. +pub(super) struct NoopSchedule; + +impl task::Schedule for NoopSchedule { + fn bind(_task: Task<Self>) -> NoopSchedule { + // Do nothing w/ the task + NoopSchedule + } + + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } +} diff --git a/third_party/rust/tokio/src/runtime/blocking/shutdown.rs b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs new file mode 100644 index 0000000000..5ee8af0fbc --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/shutdown.rs @@ -0,0 +1,58 @@ +//! A shutdown channel. +//! +//! Each worker holds the `Sender` half. When all the `Sender` halves are +//! dropped, the `Receiver` receives a notification. + +use crate::loom::sync::Arc; +use crate::sync::oneshot; + +use std::time::Duration; + +#[derive(Debug, Clone)] +pub(super) struct Sender { + tx: Arc<oneshot::Sender<()>>, +} + +#[derive(Debug)] +pub(super) struct Receiver { + rx: oneshot::Receiver<()>, +} + +pub(super) fn channel() -> (Sender, Receiver) { + let (tx, rx) = oneshot::channel(); + let tx = Sender { tx: Arc::new(tx) }; + let rx = Receiver { rx }; + + (tx, rx) +} + +impl Receiver { + /// Blocks the current thread until all `Sender` handles drop. + /// + /// If `timeout` is `Some`, the thread is blocked for **at most** `timeout` + /// duration. If `timeout` is `None`, then the thread is blocked until the + /// shutdown signal is received. + pub(crate) fn wait(&mut self, timeout: Option<Duration>) { + use crate::runtime::enter::{enter, try_enter}; + + let mut e = if std::thread::panicking() { + match try_enter() { + Some(enter) => enter, + _ => return, + } + } else { + enter() + }; + + // The oneshot completes with an Err + // + // If blocking fails to wait, this indicates a problem parking the + // current thread (usually, shutting down a runtime stored in a + // thread-local). + if let Some(timeout) = timeout { + let _ = e.block_on_timeout(&mut self.rx, timeout); + } else { + let _ = e.block_on(&mut self.rx); + } + } +} diff --git a/third_party/rust/tokio/src/runtime/blocking/task.rs b/third_party/rust/tokio/src/runtime/blocking/task.rs new file mode 100644 index 0000000000..f98b85494c --- /dev/null +++ b/third_party/rust/tokio/src/runtime/blocking/task.rs @@ -0,0 +1,40 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Converts a function to a future that completes on poll +pub(super) struct BlockingTask<T> { + func: Option<T>, +} + +impl<T> BlockingTask<T> { + /// Initializes a new blocking task from the given function + pub(super) fn new(func: T) -> BlockingTask<T> { + BlockingTask { func: Some(func) } + } +} + +impl<T, R> Future for BlockingTask<T> +where + T: FnOnce() -> R, +{ + type Output = R; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<R> { + let me = unsafe { self.get_unchecked_mut() }; + let func = me + .func + .take() + .expect("[internal exception] blocking task ran twice."); + + // This is a little subtle: + // For convenience, we'd like _every_ call tokio ever makes to Task::poll() to be budgeted + // using coop. However, the way things are currently modeled, even running a blocking task + // currently goes through Task::poll(), and so is subject to budgeting. That isn't really + // what we want; a blocking task may itself want to run tasks (it might be a Worker!), so + // we want it to start without any budgeting. + crate::coop::stop(); + + Poll::Ready(func()) + } +} diff --git a/third_party/rust/tokio/src/runtime/builder.rs b/third_party/rust/tokio/src/runtime/builder.rs new file mode 100644 index 0000000000..cfde998251 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/builder.rs @@ -0,0 +1,519 @@ +use crate::runtime::handle::Handle; +use crate::runtime::shell::Shell; +use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; + +use std::fmt; +#[cfg(not(loom))] +use std::sync::Arc; + +/// Builds Tokio Runtime with custom configuration values. +/// +/// Methods can be chained in order to set the configuration values. The +/// Runtime is constructed by calling [`build`]. +/// +/// New instances of `Builder` are obtained via [`Builder::new`]. +/// +/// See function level documentation for details on the various configuration +/// settings. +/// +/// [`build`]: #method.build +/// [`Builder::new`]: #method.new +/// +/// # Examples +/// +/// ``` +/// use tokio::runtime::Builder; +/// +/// fn main() { +/// // build runtime +/// let runtime = Builder::new() +/// .threaded_scheduler() +/// .core_threads(4) +/// .thread_name("my-custom-name") +/// .thread_stack_size(3 * 1024 * 1024) +/// .build() +/// .unwrap(); +/// +/// // use runtime ... +/// } +/// ``` +pub struct Builder { + /// The task execution model to use. + kind: Kind, + + /// Whether or not to enable the I/O driver + enable_io: bool, + + /// Whether or not to enable the time driver + enable_time: bool, + + /// The number of worker threads, used by Runtime. + /// + /// Only used when not using the current-thread executor. + core_threads: Option<usize>, + + /// Cap on thread usage. + max_threads: usize, + + /// Name used for threads spawned by the runtime. + pub(super) thread_name: String, + + /// Stack size used for threads spawned by the runtime. + pub(super) thread_stack_size: Option<usize>, + + /// Callback to run after each thread starts. + pub(super) after_start: Option<Callback>, + + /// To run before each worker thread stops + pub(super) before_stop: Option<Callback>, +} + +#[derive(Debug, Clone, Copy)] +enum Kind { + Shell, + #[cfg(feature = "rt-core")] + Basic, + #[cfg(feature = "rt-threaded")] + ThreadPool, +} + +impl Builder { + /// Returns a new runtime builder initialized with default configuration + /// values. + /// + /// Configuration methods can be chained on the return value. + pub fn new() -> Builder { + Builder { + // No task execution by default + kind: Kind::Shell, + + // I/O defaults to "off" + enable_io: false, + + // Time defaults to "off" + enable_time: false, + + // Default to lazy auto-detection (one thread per CPU core) + core_threads: None, + + max_threads: 512, + + // Default thread name + thread_name: "tokio-runtime-worker".into(), + + // Do not set a stack size by default + thread_stack_size: None, + + // No worker thread callbacks + after_start: None, + before_stop: None, + } + } + + /// Enables both I/O and time drivers. + /// + /// Doing this is a shorthand for calling `enable_io` and `enable_time` + /// individually. If additional components are added to Tokio in the future, + /// `enable_all` will include these future components. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .threaded_scheduler() + /// .enable_all() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_all(&mut self) -> &mut Self { + #[cfg(feature = "io-driver")] + self.enable_io(); + #[cfg(feature = "time")] + self.enable_time(); + + self + } + + #[deprecated(note = "In future will be replaced by core_threads method")] + /// Sets the maximum number of worker threads for the `Runtime`'s thread pool. + /// + /// This must be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + pub fn num_threads(&mut self, val: usize) -> &mut Self { + self.core_threads = Some(val); + self + } + + /// Sets the core number of worker threads for the `Runtime`'s thread pool. + /// + /// This should be a number between 1 and 32,768 though it is advised to keep + /// this value on the smaller side. + /// + /// The default value is the number of cores available to the system. + /// + /// These threads will be always active and running. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .threaded_scheduler() + /// .core_threads(4) + /// .build() + /// .unwrap(); + /// ``` + pub fn core_threads(&mut self, val: usize) -> &mut Self { + assert_ne!(val, 0, "Core threads cannot be zero"); + self.core_threads = Some(val); + self + } + + /// Specifies limit for threads, spawned by the Runtime. + /// + /// This is number of threads to be used by Runtime, including `core_threads` + /// Having `max_threads` less than `core_threads` results in invalid configuration + /// when building multi-threaded `Runtime`, which would cause a panic. + /// + /// Similarly to the `core_threads`, this number should be between 1 and 32,768. + /// + /// The default value is 512. + /// + /// When multi-threaded runtime is not used, will act as limit on additional threads. + /// + /// Otherwise as `core_threads` are always active, it limits additional threads (e.g. for + /// blocking annotations) as `max_threads - core_threads`. + pub fn max_threads(&mut self, val: usize) -> &mut Self { + assert_ne!(val, 0, "Thread limit cannot be zero"); + self.max_threads = val; + self + } + + /// Sets name of threads spawned by the `Runtime`'s thread pool. + /// + /// The default name is "tokio-runtime-worker". + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new() + /// .thread_name("my-pool") + /// .build(); + /// # } + /// ``` + pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { + self.thread_name = val.into(); + self + } + + /// Sets the stack size (in bytes) for worker threads. + /// + /// The actual stack size may be greater than this value if the platform + /// specifies minimal stack size. + /// + /// The default stack size for spawned threads is 2 MiB, though this + /// particular stack size is subject to change in the future. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new() + /// .threaded_scheduler() + /// .thread_stack_size(32 * 1024) + /// .build(); + /// # } + /// ``` + pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { + self.thread_stack_size = Some(val); + self + } + + /// Executes function `f` after each thread is started but before it starts + /// doing work. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new() + /// .threaded_scheduler() + /// .on_thread_start(|| { + /// println!("thread started"); + /// }) + /// .build(); + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Executes function `f` before each thread stops. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new() + /// .threaded_scheduler() + /// .on_thread_stop(|| { + /// println!("thread stopping"); + /// }) + /// .build(); + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.before_stop = Some(Arc::new(f)); + self + } + + /// Creates the configured `Runtime`. + /// + /// The returned `ThreadPool` instance is ready to spawn tasks. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Builder; + /// + /// let mut rt = Builder::new().build().unwrap(); + /// + /// rt.block_on(async { + /// println!("Hello from the Tokio runtime"); + /// }); + /// ``` + pub fn build(&mut self) -> io::Result<Runtime> { + match self.kind { + Kind::Shell => self.build_shell_runtime(), + #[cfg(feature = "rt-core")] + Kind::Basic => self.build_basic_runtime(), + #[cfg(feature = "rt-threaded")] + Kind::ThreadPool => self.build_threaded_runtime(), + } + } + + fn build_shell_runtime(&mut self) -> io::Result<Runtime> { + use crate::runtime::Kind; + + let clock = time::create_clock(); + + // Create I/O driver + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + + let spawner = Spawner::Shell; + + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + Ok(Runtime { + kind: Kind::Shell(Shell::new(driver)), + handle: Handle { + spawner, + io_handle, + time_handle, + clock, + blocking_spawner, + }, + blocking_pool, + }) + } +} + +cfg_io_driver! { + impl Builder { + /// Enables the I/O driver. + /// + /// Doing this enables using net, process, signal, and some I/O types on + /// the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_io() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_io(&mut self) -> &mut Self { + self.enable_io = true; + self + } + } +} + +cfg_time! { + impl Builder { + /// Enables the time driver. + /// + /// Doing this enables using `tokio::time` on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new() + /// .enable_time() + /// .build() + /// .unwrap(); + /// ``` + pub fn enable_time(&mut self) -> &mut Self { + self.enable_time = true; + self + } + } +} + +cfg_rt_core! { + impl Builder { + /// Sets runtime to use a simpler scheduler that runs all tasks on the current-thread. + /// + /// The executor and all necessary drivers will all be run on the current + /// thread during `block_on` calls. + /// + /// See also [the module level documentation][1], which has a section on scheduler + /// types. + /// + /// [1]: index.html#runtime-configurations + pub fn basic_scheduler(&mut self) -> &mut Self { + self.kind = Kind::Basic; + self + } + + fn build_basic_runtime(&mut self) -> io::Result<Runtime> { + use crate::runtime::{BasicScheduler, Kind}; + + let clock = time::create_clock(); + + // Create I/O driver + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + + // And now put a single-threaded scheduler on top of the timer. When + // there are no futures ready to do something, it'll let the timer or + // the reactor to generate some new stimuli for the futures to continue + // in their life. + let scheduler = BasicScheduler::new(driver); + let spawner = Spawner::Basic(scheduler.spawner().clone()); + + // Blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + Ok(Runtime { + kind: Kind::Basic(scheduler), + handle: Handle { + spawner, + io_handle, + time_handle, + clock, + blocking_spawner, + }, + blocking_pool, + }) + } + } +} + +cfg_rt_threaded! { + impl Builder { + /// Sets runtime to use a multi-threaded scheduler for executing tasks. + /// + /// See also [the module level documentation][1], which has a section on scheduler + /// types. + /// + /// [1]: index.html#runtime-configurations + pub fn threaded_scheduler(&mut self) -> &mut Self { + self.kind = Kind::ThreadPool; + self + } + + fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { + use crate::runtime::{Kind, ThreadPool}; + use crate::runtime::park::Parker; + + let core_threads = self.core_threads.unwrap_or_else(crate::loom::sys::num_cpus); + assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); + + let clock = time::create_clock(); + + let (io_driver, io_handle) = io::create_driver(self.enable_io)?; + let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); + let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); + + // Create the blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + // Create the runtime handle + let handle = Handle { + spawner, + io_handle, + time_handle, + clock, + blocking_spawner, + }; + + // Spawn the thread pool workers + handle.enter(|| launch.launch()); + + Ok(Runtime { + kind: Kind::ThreadPool(scheduler), + handle, + blocking_pool, + }) + } + } +} + +impl Default for Builder { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Builder") + .field("kind", &self.kind) + .field("core_threads", &self.core_threads) + .field("max_threads", &self.max_threads) + .field("thread_name", &self.thread_name) + .field("thread_stack_size", &self.thread_stack_size) + .field("after_start", &self.after_start.as_ref().map(|_| "...")) + .field("before_stop", &self.after_start.as_ref().map(|_| "...")) + .finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/context.rs b/third_party/rust/tokio/src/runtime/context.rs new file mode 100644 index 0000000000..4af2df23eb --- /dev/null +++ b/third_party/rust/tokio/src/runtime/context.rs @@ -0,0 +1,73 @@ +//! Thread local runtime context +use crate::runtime::Handle; + +use std::cell::RefCell; + +thread_local! { + static CONTEXT: RefCell<Option<Handle>> = RefCell::new(None) +} + +pub(crate) fn current() -> Option<Handle> { + CONTEXT.with(|ctx| ctx.borrow().clone()) +} + +cfg_io_driver! { + pub(crate) fn io_handle() -> crate::runtime::io::Handle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.io_handle.clone(), + None => Default::default(), + }) + } +} + +cfg_time! { + pub(crate) fn time_handle() -> crate::runtime::time::Handle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.time_handle.clone(), + None => Default::default(), + }) + } + + cfg_test_util! { + pub(crate) fn clock() -> Option<crate::runtime::time::Clock> { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => Some(ctx.clock.clone()), + None => None, + }) + } + } +} + +cfg_rt_core! { + pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => Some(ctx.spawner.clone()), + None => None, + }) + } +} + +/// Set this [`ThreadContext`] as the current active [`ThreadContext`]. +/// +/// [`ThreadContext`]: struct@ThreadContext +pub(crate) fn enter<F, R>(new: Handle, f: F) -> R +where + F: FnOnce() -> R, +{ + struct DropGuard(Option<Handle>); + + impl Drop for DropGuard { + fn drop(&mut self) { + CONTEXT.with(|ctx| { + *ctx.borrow_mut() = self.0.take(); + }); + } + } + + let _guard = CONTEXT.with(|ctx| { + let old = ctx.borrow_mut().replace(new); + DropGuard(old) + }); + + f() +} diff --git a/third_party/rust/tokio/src/runtime/enter.rs b/third_party/rust/tokio/src/runtime/enter.rs new file mode 100644 index 0000000000..afdb67a3b7 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/enter.rs @@ -0,0 +1,162 @@ +use std::cell::{Cell, RefCell}; +use std::fmt; +use std::marker::PhantomData; + +thread_local!(static ENTERED: Cell<bool> = Cell::new(false)); + +/// Represents an executor context. +pub(crate) struct Enter { + _p: PhantomData<RefCell<()>>, +} + +/// Marks the current thread as being within the dynamic extent of an +/// executor. +pub(crate) fn enter() -> Enter { + if let Some(enter) = try_enter() { + return enter; + } + + panic!( + "Cannot start a runtime from within a runtime. This happens \ + because a function (like `block_on`) attempted to block the \ + current thread while the thread is being used to drive \ + asynchronous tasks." + ); +} + +/// Tries to enter a runtime context, returns `None` if already in a runtime +/// context. +pub(crate) fn try_enter() -> Option<Enter> { + ENTERED.with(|c| { + if c.get() { + None + } else { + c.set(true); + Some(Enter { _p: PhantomData }) + } + }) +} + +// Forces the current "entered" state to be cleared while the closure +// is executed. +// +// # Warning +// +// This is hidden for a reason. Do not use without fully understanding +// executors. Misuing can easily cause your program to deadlock. +#[cfg(all(feature = "rt-threaded", feature = "blocking"))] +pub(crate) fn exit<F: FnOnce() -> R, R>(f: F) -> R { + // Reset in case the closure panics + struct Reset; + impl Drop for Reset { + fn drop(&mut self) { + ENTERED.with(|c| { + c.set(true); + }); + } + } + + ENTERED.with(|c| { + debug_assert!(c.get()); + c.set(false); + }); + + let reset = Reset; + let ret = f(); + std::mem::forget(reset); + + ENTERED.with(|c| { + assert!(!c.get(), "closure claimed permanent executor"); + c.set(true); + }); + + ret +} + +cfg_blocking_impl! { + use crate::park::ParkError; + use std::time::Duration; + + impl Enter { + /// Blocks the thread on the specified future, returning the value with + /// which that future completes. + pub(crate) fn block_on<F>(&mut self, mut f: F) -> Result<F::Output, ParkError> + where + F: std::future::Future, + { + use crate::park::{CachedParkThread, Park}; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll::Ready; + + let mut park = CachedParkThread::new(); + let waker = park.get_unpark()?.into_waker(); + let mut cx = Context::from_waker(&waker); + + // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can + // no longer be accessed, making the pinning safe. + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); + } + + park.park()?; + } + } + + /// Blocks the thread on the specified future for **at most** `timeout` + /// + /// If the future completes before `timeout`, the result is returned. If + /// `timeout` elapses, then `Err` is returned. + pub(crate) fn block_on_timeout<F>(&mut self, mut f: F, timeout: Duration) -> Result<F::Output, ParkError> + where + F: std::future::Future, + { + use crate::park::{CachedParkThread, Park}; + use std::pin::Pin; + use std::task::Context; + use std::task::Poll::Ready; + use std::time::Instant; + + let mut park = CachedParkThread::new(); + let waker = park.get_unpark()?.into_waker(); + let mut cx = Context::from_waker(&waker); + + // `block_on` takes ownership of `f`. Once it is pinned here, the original `f` binding can + // no longer be accessed, making the pinning safe. + let mut f = unsafe { Pin::new_unchecked(&mut f) }; + let when = Instant::now() + timeout; + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return Ok(v); + } + + let now = Instant::now(); + + if now >= when { + return Err(()); + } + + park.park_timeout(when - now)?; + } + } + } +} + +impl fmt::Debug for Enter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Enter").finish() + } +} + +impl Drop for Enter { + fn drop(&mut self) { + ENTERED.with(|c| { + assert!(c.get()); + c.set(false); + }); + } +} diff --git a/third_party/rust/tokio/src/runtime/handle.rs b/third_party/rust/tokio/src/runtime/handle.rs new file mode 100644 index 0000000000..db53543e85 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/handle.rs @@ -0,0 +1,140 @@ +use crate::runtime::{blocking, context, io, time, Spawner}; +use std::{error, fmt}; + +cfg_rt_core! { + use crate::task::JoinHandle; + + use std::future::Future; +} + +/// Handle to the runtime. +/// +/// The handle is internally reference-counted and can be freely cloned. A handle can be +/// obtained using the [`Runtime::handle`] method. +/// +/// [`Runtime::handle`]: crate::runtime::Runtime::handle() +#[derive(Debug, Clone)] +pub struct Handle { + pub(super) spawner: Spawner, + + /// Handles to the I/O drivers + pub(super) io_handle: io::Handle, + + /// Handles to the time drivers + pub(super) time_handle: time::Handle, + + /// Source of `Instant::now()` + pub(super) clock: time::Clock, + + /// Blocking pool spawner + pub(super) blocking_spawner: blocking::Spawner, +} + +impl Handle { + /// Enter the runtime context. + pub fn enter<F, R>(&self, f: F) -> R + where + F: FnOnce() -> R, + { + context::enter(self.clone(), f) + } + + /// Returns a Handle view over the currently running Runtime + /// + /// # Panic + /// + /// This will panic if called outside the context of a Tokio runtime. + /// + /// # Examples + /// + /// This can be used to obtain the handle of the surrounding runtime from an async + /// block or function running on that runtime. + /// + /// ``` + /// # use tokio::runtime::Runtime; + /// # fn dox() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.spawn(async { + /// use tokio::runtime::Handle; + /// + /// // Inside an async block or function. + /// let handle = Handle::current(); + /// handle.spawn(async { + /// println!("now running in the existing Runtime"); + /// }) + /// # }); + /// # } + /// ``` + pub fn current() -> Self { + context::current().expect("not currently running on the Tokio runtime.") + } + + /// Returns a Handle view over the currently running Runtime + /// + /// Returns an error if no Runtime has been started + /// + /// Contrary to `current`, this never panics + pub fn try_current() -> Result<Self, TryCurrentError> { + context::current().ok_or(TryCurrentError(())) + } +} + +cfg_rt_core! { + impl Handle { + /// Spawns a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// let handle = rt.handle(); + /// + /// // Spawn a future onto the runtime + /// handle.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + } +} + +/// Error returned by `try_current` when no Runtime has been started +pub struct TryCurrentError(()); + +impl fmt::Debug for TryCurrentError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("TryCurrentError").finish() + } +} + +impl fmt::Display for TryCurrentError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("no tokio Runtime has been initialized") + } +} + +impl error::Error for TryCurrentError {} diff --git a/third_party/rust/tokio/src/runtime/io.rs b/third_party/rust/tokio/src/runtime/io.rs new file mode 100644 index 0000000000..6a0953af85 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/io.rs @@ -0,0 +1,63 @@ +//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O +//! driver. When the `time` feature flag is **not** enabled. These APIs are +//! shells. This isolates the complexity of dealing with conditional +//! compilation. + +/// Re-exported for convenience. +pub(crate) use std::io::Result; + +pub(crate) use variant::*; + +#[cfg(feature = "io-driver")] +mod variant { + use crate::io::driver; + use crate::park::{Either, ParkThread}; + + use std::io; + + /// The driver value the runtime passes to the `timer` layer. + /// + /// When the `io-driver` feature is enabled, this is the "real" I/O driver + /// backed by Mio. Without the `io-driver` feature, this is a thread parker + /// backed by a condition variable. + pub(crate) type Driver = Either<driver::Driver, ParkThread>; + + /// The handle the runtime stores for future use. + /// + /// When the `io-driver` feature is **not** enabled, this is `()`. + pub(crate) type Handle = Option<driver::Handle>; + + pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { + #[cfg(loom)] + assert!(!enable); + + if enable { + let driver = driver::Driver::new()?; + let handle = driver.handle(); + + Ok((Either::A(driver), Some(handle))) + } else { + let driver = ParkThread::new(); + Ok((Either::B(driver), None)) + } + } +} + +#[cfg(not(feature = "io-driver"))] +mod variant { + use crate::park::ParkThread; + + use std::io; + + /// I/O is not enabled, use a condition variable based parker + pub(crate) type Driver = ParkThread; + + /// There is no handle + pub(crate) type Handle = (); + + pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { + let driver = ParkThread::new(); + + Ok((driver, ())) + } +} diff --git a/third_party/rust/tokio/src/runtime/mod.rs b/third_party/rust/tokio/src/runtime/mod.rs new file mode 100644 index 0000000000..36b2b442ee --- /dev/null +++ b/third_party/rust/tokio/src/runtime/mod.rs @@ -0,0 +1,494 @@ +//! The Tokio runtime. +//! +//! Unlike other Rust programs, asynchronous applications require +//! runtime support. In particular, the following runtime services are +//! necessary: +//! +//! * An **I/O event loop**, called the driver, which drives I/O resources and +//! dispatches I/O events to tasks that depend on them. +//! * A **scheduler** to execute [tasks] that use these I/O resources. +//! * A **timer** for scheduling work to run after a set period of time. +//! +//! Tokio's [`Runtime`] bundles all of these services as a single type, allowing +//! them to be started, shut down, and configured together. However, most +//! applications won't need to use [`Runtime`] directly. Instead, they can +//! use the [`tokio::main`] attribute macro, which creates a [`Runtime`] under +//! the hood. +//! +//! # Usage +//! +//! Most applications will use the [`tokio::main`] attribute macro. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! } +//! ``` +//! +//! From within the context of the runtime, additional tasks are spawned using +//! the [`tokio::spawn`] function. Futures spawned using this function will be +//! executed on the same thread pool used by the [`Runtime`]. +//! +//! A [`Runtime`] instance can also be used directly. +//! +//! ```no_run +//! use tokio::net::TcpListener; +//! use tokio::prelude::*; +//! use tokio::runtime::Runtime; +//! +//! fn main() -> Result<(), Box<dyn std::error::Error>> { +//! // Create the runtime +//! let mut rt = Runtime::new()?; +//! +//! // Spawn the root task +//! rt.block_on(async { +//! let mut listener = TcpListener::bind("127.0.0.1:8080").await?; +//! +//! loop { +//! let (mut socket, _) = listener.accept().await?; +//! +//! tokio::spawn(async move { +//! let mut buf = [0; 1024]; +//! +//! // In a loop, read data from the socket and write the data back. +//! loop { +//! let n = match socket.read(&mut buf).await { +//! // socket closed +//! Ok(n) if n == 0 => return, +//! Ok(n) => n, +//! Err(e) => { +//! println!("failed to read from socket; err = {:?}", e); +//! return; +//! } +//! }; +//! +//! // Write the data back +//! if let Err(e) = socket.write_all(&buf[0..n]).await { +//! println!("failed to write to socket; err = {:?}", e); +//! return; +//! } +//! } +//! }); +//! } +//! }) +//! } +//! ``` +//! +//! ## Runtime Configurations +//! +//! Tokio provides multiple task scheduling strategies, suitable for different +//! applications. The [runtime builder] or `#[tokio::main]` attribute may be +//! used to select which scheduler to use. +//! +//! #### Basic Scheduler +//! +//! The basic scheduler provides a _single-threaded_ future executor. All tasks +//! will be created and executed on the current thread. The basic scheduler +//! requires the `rt-core` feature flag, and can be selected using the +//! [`Builder::basic_scheduler`] method: +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let basic_rt = runtime::Builder::new() +//! .basic_scheduler() +//! .build()?; +//! # Ok(()) } +//! ``` +//! +//! If the `rt-core` feature is enabled and `rt-threaded` is not, +//! [`Runtime::new`] will return a basic scheduler runtime by default. +//! +//! #### Threaded Scheduler +//! +//! The threaded scheduler executes futures on a _thread pool_, using a +//! work-stealing strategy. By default, it will start a worker thread for each +//! CPU core available on the system. This tends to be the ideal configurations +//! for most applications. The threaded scheduler requires the `rt-threaded` feature +//! flag, and can be selected using the [`Builder::threaded_scheduler`] method: +//! ``` +//! use tokio::runtime; +//! +//! # fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let threaded_rt = runtime::Builder::new() +//! .threaded_scheduler() +//! .build()?; +//! # Ok(()) } +//! ``` +//! +//! If the `rt-threaded` feature flag is enabled, [`Runtime::new`] will return a +//! threaded scheduler runtime by default. +//! +//! Most applications should use the threaded scheduler, except in some niche +//! use-cases, such as when running only a single thread is required. +//! +//! #### Resource drivers +//! +//! When configuring a runtime by hand, no resource drivers are enabled by +//! default. In this case, attempting to use networking types or time types will +//! fail. In order to enable these types, the resource drivers must be enabled. +//! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a +//! shorthand, [`Builder::enable_all`] enables both resource drivers. +//! +//! ## Lifetime of spawned threads +//! +//! The runtime may spawn threads depending on its configuration and usage. The +//! threaded scheduler spawns threads to schedule tasks and calls to +//! `spawn_blocking` spawn threads to run blocking operations. +//! +//! While the `Runtime` is active, threads may shutdown after periods of being +//! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. +//! Any tasks that have not yet completed will be dropped. +//! +//! [tasks]: crate::task +//! [`Runtime`]: Runtime +//! [`tokio::spawn`]: crate::spawn +//! [`tokio::main`]: ../attr.main.html +//! [runtime builder]: crate::runtime::Builder +//! [`Runtime::new`]: crate::runtime::Runtime::new +//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler +//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler +//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io +//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time +//! [`Builder::enable_all`]: crate::runtime::Builder::enable_all + +// At the top due to macros +#[cfg(test)] +#[macro_use] +mod tests; + +pub(crate) mod context; + +cfg_rt_core! { + mod basic_scheduler; + use basic_scheduler::BasicScheduler; + + pub(crate) mod task; +} + +mod blocking; +use blocking::BlockingPool; + +cfg_blocking_impl! { + #[allow(unused_imports)] + pub(crate) use blocking::{spawn_blocking, try_spawn_blocking}; +} + +mod builder; +pub use self::builder::Builder; + +pub(crate) mod enter; +use self::enter::enter; + +mod handle; +pub use self::handle::{Handle, TryCurrentError}; + +mod io; + +cfg_rt_threaded! { + mod park; + use park::Parker; +} + +mod shell; +use self::shell::Shell; + +mod spawner; +use self::spawner::Spawner; + +mod time; + +cfg_rt_threaded! { + mod queue; + + pub(crate) mod thread_pool; + use self::thread_pool::ThreadPool; +} + +cfg_rt_core! { + use crate::task::JoinHandle; +} + +use std::future::Future; +use std::time::Duration; + +/// The Tokio runtime. +/// +/// The runtime provides an I/O [driver], task scheduler, [timer], and blocking +/// pool, necessary for running asynchronous tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use the `#[tokio::main]` annotation on their entry point instead. +/// +/// See [module level][mod] documentation for more details. +/// +/// # Shutdown +/// +/// Shutting down the runtime is done by dropping the value. The current thread +/// will block until the shut down operation has completed. +/// +/// * Drain any scheduled work queues. +/// * Drop any futures that have not yet completed. +/// * Drop the reactor. +/// +/// Once the reactor has dropped, any outstanding I/O resources bound to +/// that reactor will no longer function. Calling any method on them will +/// result in an error. +/// +/// [driver]: crate::io::driver +/// [timer]: crate::time +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct@Builder +/// [`tokio::run`]: fn@run +#[derive(Debug)] +pub struct Runtime { + /// Task executor + kind: Kind, + + /// Handle to runtime, also contains driver handles + handle: Handle, + + /// Blocking pool handle, used to signal shutdown + blocking_pool: BlockingPool, +} + +/// The runtime executor is either a thread-pool or a current-thread executor. +#[derive(Debug)] +enum Kind { + /// Not able to execute concurrent tasks. This variant is mostly used to get + /// access to the driver handles. + Shell(Shell), + + /// Execute all tasks on the current-thread. + #[cfg(feature = "rt-core")] + Basic(BasicScheduler<time::Driver>), + + /// Execute tasks across multiple threads. + #[cfg(feature = "rt-threaded")] + ThreadPool(ThreadPool), +} + +/// After thread starts / before thread stops +type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a scheduler, I/O driver, and time driver being + /// initialized. The type of scheduler used depends on what feature flags + /// are enabled: if the `rt-threaded` feature is enabled, the [threaded + /// scheduler] is used, while if only the `rt-core` feature is enabled, the + /// [basic scheduler] is used instead. + /// + /// If the threaded scheduler is selected, it will not spawn + /// any worker threads until it needs to, i.e. tasks are scheduled to run. + /// + /// Most applications will not need to call this function directly. Instead, + /// they will use the [`#[tokio::main]` attribute][main]. When more complex + /// configuration is necessary, the [runtime builder] may be used. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// ``` + /// + /// [mod]: index.html + /// [main]: ../../tokio_macros/attr.main.html + /// [threaded scheduler]: index.html#threaded-scheduler + /// [basic scheduler]: index.html#basic-scheduler + /// [runtime builder]: crate::runtime::Builder + pub fn new() -> io::Result<Runtime> { + #[cfg(feature = "rt-threaded")] + let ret = Builder::new().threaded_scheduler().enable_all().build(); + + #[cfg(all(not(feature = "rt-threaded"), feature = "rt-core"))] + let ret = Builder::new().basic_scheduler().enable_all().build(); + + #[cfg(not(feature = "rt-core"))] + let ret = Builder::new().enable_all().build(); + + ret + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(async { + /// println!("now running on a worker thread"); + /// }); + /// # } + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + #[cfg(feature = "rt-core")] + pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match &self.kind { + Kind::Shell(_) => panic!("task execution disabled"), + #[cfg(feature = "rt-threaded")] + Kind::ThreadPool(exec) => exec.spawn(future), + Kind::Basic(exec) => exec.spawn(future), + } + } + + /// Run a future to completion on the Tokio runtime. This is the runtime's + /// entry point. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on<F: Future>(&mut self, future: F) -> F::Output { + let kind = &mut self.kind; + + self.handle.enter(|| match kind { + Kind::Shell(exec) => exec.block_on(future), + #[cfg(feature = "rt-core")] + Kind::Basic(exec) => exec.block_on(future), + #[cfg(feature = "rt-threaded")] + Kind::ThreadPool(exec) => exec.block_on(future), + }) + } + + /// Enter the runtime context. + pub fn enter<F, R>(&self, f: F) -> R + where + F: FnOnce() -> R, + { + self.handle.enter(f) + } + + /// Return a handle to the runtime's spawner. + /// + /// The returned handle can be used to spawn tasks that run on this runtime, and can + /// be cloned to allow moving the `Handle` to other threads. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// + /// handle.spawn(async { println!("hello"); }); + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } + + /// Shutdown the runtime, waiting for at most `duration` for all spawned + /// task to shutdown. + /// + /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to + /// shutdown in a timely fashion. However, dropping a `Runtime` will wait + /// indefinitely for all tasks to terminate, and there are cases where a long + /// blocking task has been spawned which can block dropping `Runtime`. + /// + /// In this case, calling `shutdown_timeout` with an explicit wait timeout + /// can work. The `shutdown_timeout` will signal all tasks to shutdown and + /// will wait for at most `duration` for all spawned tasks to terminate. If + /// `timeout` elapses before all tasks are dropped, the function returns and + /// outstanding tasks are potentially leaked. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::task; + /// + /// use std::thread; + /// use std::time::Duration; + /// + /// fn main() { + /// let mut runtime = Runtime::new().unwrap(); + /// + /// runtime.block_on(async move { + /// task::spawn_blocking(move || { + /// thread::sleep(Duration::from_secs(10_000)); + /// }); + /// }); + /// + /// runtime.shutdown_timeout(Duration::from_millis(100)); + /// } + /// ``` + pub fn shutdown_timeout(self, duration: Duration) { + let Runtime { + mut blocking_pool, .. + } = self; + blocking_pool.shutdown(Some(duration)); + } +} diff --git a/third_party/rust/tokio/src/runtime/park.rs b/third_party/rust/tokio/src/runtime/park.rs new file mode 100644 index 0000000000..ee437d1d94 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/park.rs @@ -0,0 +1,245 @@ +//! Parks the runtime. +//! +//! A combination of the various resource driver park handles. + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::thread; +use crate::park::{Park, Unpark}; +use crate::runtime::time; +use crate::util::TryLock; + +use std::sync::atomic::Ordering::SeqCst; +use std::time::Duration; + +pub(crate) struct Parker { + inner: Arc<Inner>, +} + +pub(crate) struct Unparker { + inner: Arc<Inner>, +} + +struct Inner { + /// Avoids entering the park if possible + state: AtomicUsize, + + /// Used to coordinate access to the driver / condvar + mutex: Mutex<()>, + + /// Condvar to block on if the driver is unavailable. + condvar: Condvar, + + /// Resource (I/O, time, ...) driver + shared: Arc<Shared>, +} + +const EMPTY: usize = 0; +const PARKED_CONDVAR: usize = 1; +const PARKED_DRIVER: usize = 2; +const NOTIFIED: usize = 3; + +/// Shared across multiple Parker handles +struct Shared { + /// Shared driver. Only one thread at a time can use this + driver: TryLock<time::Driver>, + + /// Unpark handle + handle: <time::Driver as Park>::Unpark, +} + +impl Parker { + pub(crate) fn new(driver: time::Driver) -> Parker { + let handle = driver.unpark(); + + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: Arc::new(Shared { + driver: TryLock::new(driver), + handle, + }), + }), + } + } +} + +impl Clone for Parker { + fn clone(&self) -> Parker { + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: self.inner.shared.clone(), + }), + } + } +} + +impl Park for Parker { + type Unpark = Unparker; + type Error = (); + + fn unpark(&self) -> Unparker { + Unparker { + inner: self.inner.clone(), + } + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + // Only parking with zero is supported... + assert_eq!(duration, Duration::from_millis(0)); + + if let Some(mut driver) = self.inner.shared.driver.try_lock() { + driver.park_timeout(duration).map_err(|_| ()) + } else { + Ok(()) + } + } +} + +impl Unpark for Unparker { + fn unpark(&self) { + self.inner.unpark(); + } +} + +impl Inner { + /// Parks the current thread for at most `dur`. + fn park(&self) { + for _ in 0..3 { + // If we were previously notified then we consume this notification and + // return quickly. + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + return; + } + + thread::yield_now(); + } + + if let Some(mut driver) = self.shared.driver.try_lock() { + self.park_driver(&mut driver); + } else { + self.park_condvar(); + } + } + + fn park_condvar(&self) { + // Otherwise we need to coordinate going to sleep + let mut m = self.mutex.lock().unwrap(); + + match self + .state + .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) + { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + loop { + m = self.condvar.wait(m).unwrap(); + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return; + } + + // spurious wakeup, go back to sleep + } + } + + fn park_driver(&self, driver: &mut time::Driver) { + match self + .state + .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) + { + Ok(_) => {} + Err(NOTIFIED) => { + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. + let old = self.state.swap(EMPTY, SeqCst); + debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); + + return; + } + Err(actual) => panic!("inconsistent park state; actual = {}", actual), + } + + // TODO: don't unwrap + driver.park().unwrap(); + + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => {} // got a notification, hurray! + PARKED_DRIVER => {} // no notification, alas + n => panic!("inconsistent park_timeout state: {}", n), + } + } + + fn unpark(&self) { + // To ensure the unparked thread will observe any writes we made before + // this call, we must perform a release operation that `park` can + // synchronize with. To do that we must write `NOTIFIED` even if `state` + // is already `NOTIFIED`. That is why this must be a swap rather than a + // compare-and-swap that returns if it reads `NOTIFIED` on failure. + match self.state.swap(NOTIFIED, SeqCst) { + EMPTY => {} // no one was waiting + NOTIFIED => {} // already unparked + PARKED_CONDVAR => self.unpark_condvar(), + PARKED_DRIVER => self.unpark_driver(), + actual => panic!("inconsistent state in unpark; actual = {}", actual), + } + } + + fn unpark_condvar(&self) { + // There is a period between when the parked thread sets `state` to + // `PARKED` (or last checked `state` in the case of a spurious wake + // up) and when it actually waits on `cvar`. If we were to notify + // during this period it would be ignored and then when the parked + // thread went to sleep it would never wake up. Fortunately, it has + // `lock` locked at this stage so we can acquire `lock` to wait until + // it is ready to receive the notification. + // + // Releasing `lock` before the call to `notify_one` means that when the + // parked thread wakes it doesn't get woken only to have to wait for us + // to release `lock`. + drop(self.mutex.lock().unwrap()); + + self.condvar.notify_one() + } + + fn unpark_driver(&self) { + self.shared.handle.unpark(); + } +} diff --git a/third_party/rust/tokio/src/runtime/queue.rs b/third_party/rust/tokio/src/runtime/queue.rs new file mode 100644 index 0000000000..c654514bbc --- /dev/null +++ b/third_party/rust/tokio/src/runtime/queue.rs @@ -0,0 +1,630 @@ +//! Run-queue structures to support a work-stealing scheduler + +use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::{AtomicU16, AtomicU32, AtomicUsize}; +use crate::loom::sync::{Arc, Mutex}; +use crate::runtime::task; + +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; + +/// Producer handle. May only be used from a single thread. +pub(super) struct Local<T: 'static> { + inner: Arc<Inner<T>>, +} + +/// Consumer handle. May be used from many threads. +pub(super) struct Steal<T: 'static>(Arc<Inner<T>>); + +/// Growable, MPMC queue used to inject new tasks into the scheduler and as an +/// overflow queue when the local, fixed-size, array queue overflows. +pub(super) struct Inject<T: 'static> { + /// Pointers to the head and tail of the queue + pointers: Mutex<Pointers>, + + /// Number of pending tasks in the queue. This helps prevent unnecessary + /// locking in the hot path. + len: AtomicUsize, + + _p: PhantomData<T>, +} + +pub(super) struct Inner<T: 'static> { + /// Concurrently updated by many threads. + /// + /// Contains two `u16` values. The LSB byte is the "real" head of the queue. + /// The `u16` in the MSB is set by a stealer in process of stealing values. + /// It represents the first value being stolen in the batch. `u16` is used + /// in order to distinguish between `head == tail` and `head == tail - + /// capacity`. + /// + /// When both `u16` values are the same, there is no active stealer. + /// + /// Tracking an in-progress stealer prevents a wrapping scenario. + head: AtomicU32, + + /// Only updated by producer thread but read by many threads. + tail: AtomicU16, + + /// Elements + buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>, +} + +struct Pointers { + /// True if the queue is closed + is_closed: bool, + + /// Linked-list head + head: Option<NonNull<task::Header>>, + + /// Linked-list tail + tail: Option<NonNull<task::Header>>, +} + +unsafe impl<T> Send for Inner<T> {} +unsafe impl<T> Sync for Inner<T> {} +unsafe impl<T> Send for Inject<T> {} +unsafe impl<T> Sync for Inject<T> {} + +#[cfg(not(loom))] +const LOCAL_QUEUE_CAPACITY: usize = 256; + +// Shrink the size of the local queue when using loom. This shouldn't impact +// logic, but allows loom to test more edge cases in a reasonable a mount of +// time. +#[cfg(loom)] +const LOCAL_QUEUE_CAPACITY: usize = 4; + +const MASK: usize = LOCAL_QUEUE_CAPACITY - 1; + +/// Create a new local run-queue +pub(super) fn local<T: 'static>() -> (Steal<T>, Local<T>) { + let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); + + for _ in 0..LOCAL_QUEUE_CAPACITY { + buffer.push(UnsafeCell::new(MaybeUninit::uninit())); + } + + let inner = Arc::new(Inner { + head: AtomicU32::new(0), + tail: AtomicU16::new(0), + buffer: buffer.into(), + }); + + let local = Local { + inner: inner.clone(), + }; + + let remote = Steal(inner); + + (remote, local) +} + +impl<T> Local<T> { + /// Returns true if the queue has entries that can be stealed. + pub(super) fn is_stealable(&self) -> bool { + !self.inner.is_empty() + } + + /// Pushes a task to the back of the local queue, skipping the LIFO slot. + pub(super) fn push_back(&mut self, mut task: task::Notified<T>, inject: &Inject<T>) { + let tail = loop { + let head = self.inner.head.load(Acquire); + let (steal, real) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let tail = unsafe { self.inner.tail.unsync_load() }; + + if tail.wrapping_sub(steal) < LOCAL_QUEUE_CAPACITY as u16 { + // There is capacity for the task + break tail; + } else if steal != real { + // Concurrently stealing, this will free up capacity, so + // only push the new task onto the inject queue + inject.push(task); + return; + } else { + // Push the current task and half of the queue into the + // inject queue. + match self.push_overflow(task, real, tail, inject) { + Ok(_) => return, + // Lost the race, try again + Err(v) => { + task = v; + } + } + } + }; + + // Map the position to a slot index. + let idx = tail as usize & MASK; + + self.inner.buffer[idx].with_mut(|ptr| { + // Write the task to the slot + // + // Safety: There is only one producer and the above `if` + // condition ensures we don't touch a cell if there is a + // value, thus no consumer. + unsafe { + ptr::write((*ptr).as_mut_ptr(), task); + } + }); + + // Make the task available. Synchronizes with a load in + // `steal_into2`. + self.inner.tail.store(tail.wrapping_add(1), Release); + } + + /// Moves a batch of tasks into the inject queue. + /// + /// This will temporarily make some of the tasks unavailable to stealers. + /// Once `push_overflow` is done, a notification is sent out, so if other + /// workers "missed" some of the tasks during a steal, they will get + /// another opportunity. + #[inline(never)] + fn push_overflow( + &mut self, + task: task::Notified<T>, + head: u16, + tail: u16, + inject: &Inject<T>, + ) -> Result<(), task::Notified<T>> { + const BATCH_LEN: usize = LOCAL_QUEUE_CAPACITY / 2 + 1; + + let n = (LOCAL_QUEUE_CAPACITY / 2) as u16; + assert_eq!( + tail.wrapping_sub(head) as usize, + LOCAL_QUEUE_CAPACITY, + "queue is not full; tail = {}; head = {}", + tail, + head + ); + + let prev = pack(head, head); + + // Claim a bunch of tasks + // + // We are claiming the tasks **before** reading them out of the buffer. + // This is safe because only the **current** thread is able to push new + // tasks. + // + // There isn't really any need for memory ordering... Relaxed would + // work. This is because all tasks are pushed into the queue from the + // current thread (or memory has been acquired if the local queue handle + // moved). + let actual = self.inner.head.compare_and_swap( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + ); + + if actual != prev { + // We failed to claim the tasks, losing the race. Return out of + // this function and try the full `push` routine again. The queue + // may not be full anymore. + return Err(task); + } + + // link the tasks + for i in 0..n { + let j = i + 1; + + let i_idx = i.wrapping_add(head) as usize & MASK; + let j_idx = j.wrapping_add(head) as usize & MASK; + + // Get the next pointer + let next = if j == n { + // The last task in the local queue being moved + task.header().into() + } else { + // safety: The above CAS prevents a stealer from accessing these + // tasks and we are the only producer. + self.inner.buffer[j_idx].with(|ptr| unsafe { + let value = (*ptr).as_ptr(); + (*value).header().into() + }) + }; + + // safety: the above CAS prevents a stealer from accessing these + // tasks and we are the only producer. + self.inner.buffer[i_idx].with_mut(|ptr| unsafe { + let ptr = (*ptr).as_ptr(); + (*ptr).header().queue_next.with_mut(|ptr| *ptr = Some(next)); + }); + } + + // safety: the above CAS prevents a stealer from accessing these tasks + // and we are the only producer. + let head = self.inner.buffer[head as usize & MASK] + .with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + // Push the tasks onto the inject queue + inject.push_batch(head, task, BATCH_LEN); + + Ok(()) + } + + /// Pops a task from the local queue. + pub(super) fn pop(&mut self) -> Option<task::Notified<T>> { + let mut head = self.inner.head.load(Acquire); + + let idx = loop { + let (steal, real) = unpack(head); + + // safety: this is the **only** thread that updates this cell. + let tail = unsafe { self.inner.tail.unsync_load() }; + + if real == tail { + // queue is empty + return None; + } + + let next_real = real.wrapping_add(1); + + // If `steal == real` there are no concurrent stealers. Both `steal` + // and `real` are updated. + let next = if steal == real { + pack(next_real, next_real) + } else { + assert_ne!(steal, next_real); + pack(steal, next_real) + }; + + // Attempt to claim a task. + let res = self + .inner + .head + .compare_exchange(head, next, AcqRel, Acquire); + + match res { + Ok(_) => break real as usize & MASK, + Err(actual) => head = actual, + } + }; + + Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() })) + } +} + +impl<T> Steal<T> { + pub(super) fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Steals half the tasks from self and place them into `dst`. + pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> { + // Safety: the caller is the only thread that mutates `dst.tail` and + // holds a mutable reference. + let dst_tail = unsafe { dst.inner.tail.unsync_load() }; + + // To the caller, `dst` may **look** empty but still have values + // contained in the buffer. If another thread is concurrently stealing + // from `dst` there may not be enough capacity to steal. + let (steal, _) = unpack(dst.inner.head.load(Acquire)); + + if dst_tail.wrapping_sub(steal) > LOCAL_QUEUE_CAPACITY as u16 / 2 { + // we *could* try to steal less here, but for simplicity, we're just + // going to abort. + return None; + } + + // Steal the tasks into `dst`'s buffer. This does not yet expose the + // tasks in `dst`. + let mut n = self.steal_into2(dst, dst_tail); + + if n == 0 { + // No tasks were stolen + return None; + } + + // We are returning a task here + n -= 1; + + let ret_pos = dst_tail.wrapping_add(n); + let ret_idx = ret_pos as usize & MASK; + + // safety: the value was written as part of `steal_into2` and not + // exposed to stealers, so no other thread can access it. + let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + if n == 0 { + // The `dst` queue is empty, but a single task was stolen + return Some(ret); + } + + // Make the stolen items available to consumers + dst.inner.tail.store(dst_tail.wrapping_add(n), Release); + + Some(ret) + } + + // Steal tasks from `self`, placing them into `dst`. Returns the number of + // tasks that were stolen. + fn steal_into2(&self, dst: &mut Local<T>, dst_tail: u16) -> u16 { + let mut prev_packed = self.0.head.load(Acquire); + let mut next_packed; + + let n = loop { + let (src_head_steal, src_head_real) = unpack(prev_packed); + let src_tail = self.0.tail.load(Acquire); + + // If these two do not match, another thread is concurrently + // stealing from the queue. + if src_head_steal != src_head_real { + return 0; + } + + // Number of available tasks to steal + let n = src_tail.wrapping_sub(src_head_real); + let n = n - n / 2; + + if n == 0 { + // No tasks available to steal + return 0; + } + + // Update the real head index to acquire the tasks. + let steal_to = src_head_real.wrapping_add(n); + assert_ne!(src_head_steal, steal_to); + next_packed = pack(src_head_steal, steal_to); + + // Claim all those tasks. This is done by incrementing the "real" + // head but not the steal. By doing this, no other thread is able to + // steal from this queue until the current thread completes. + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => break n, + Err(actual) => prev_packed = actual, + } + }; + + assert!(n <= LOCAL_QUEUE_CAPACITY as u16 / 2, "actual = {}", n); + + let (first, _) = unpack(next_packed); + + // Take all the tasks + for i in 0..n { + // Compute the positions + let src_pos = first.wrapping_add(i); + let dst_pos = dst_tail.wrapping_add(i); + + // Map to slots + let src_idx = src_pos as usize & MASK; + let dst_idx = dst_pos as usize & MASK; + + // Read the task + // + // safety: We acquired the task with the atomic exchange above. + let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) }); + + // Write the task to the new slot + // + // safety: `dst` queue is empty and we are the only producer to + // this queue. + dst.inner.buffer[dst_idx] + .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) }); + } + + let mut prev_packed = next_packed; + + // Update `src_head_steal` to match `src_head_real` signalling that the + // stealing routine is complete. + loop { + let head = unpack(prev_packed).1; + next_packed = pack(head, head); + + let res = self + .0 + .head + .compare_exchange(prev_packed, next_packed, AcqRel, Acquire); + + match res { + Ok(_) => return n, + Err(actual) => { + let (actual_steal, actual_real) = unpack(actual); + + assert_ne!(actual_steal, actual_real); + + prev_packed = actual; + } + } + } + } +} + +impl<T> Clone for Steal<T> { + fn clone(&self) -> Steal<T> { + Steal(self.0.clone()) + } +} + +impl<T> Drop for Local<T> { + fn drop(&mut self) { + if !std::thread::panicking() { + assert!(self.pop().is_none(), "queue not empty"); + } + } +} + +impl<T> Inner<T> { + fn is_empty(&self) -> bool { + let (_, head) = unpack(self.head.load(Acquire)); + let tail = self.tail.load(Acquire); + + head == tail + } +} + +impl<T: 'static> Inject<T> { + pub(super) fn new() -> Inject<T> { + Inject { + pointers: Mutex::new(Pointers { + is_closed: false, + head: None, + tail: None, + }), + len: AtomicUsize::new(0), + _p: PhantomData, + } + } + + pub(super) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Close the injection queue, returns `true` if the queue is open when the + /// transition is made. + pub(super) fn close(&self) -> bool { + let mut p = self.pointers.lock().unwrap(); + + if p.is_closed { + return false; + } + + p.is_closed = true; + true + } + + pub(super) fn is_closed(&self) -> bool { + self.pointers.lock().unwrap().is_closed + } + + pub(super) fn len(&self) -> usize { + self.len.load(Acquire) + } + + /// Pushes a value into the queue. + pub(super) fn push(&self, task: task::Notified<T>) { + // Acquire queue lock + let mut p = self.pointers.lock().unwrap(); + + if p.is_closed { + // Drop the mutex to avoid a potential deadlock when + // re-entering. + drop(p); + drop(task); + return; + } + + // safety: only mutated with the lock held + let len = unsafe { self.len.unsync_load() }; + let task = task.into_raw(); + + // The next pointer should already be null + debug_assert!(get_next(task).is_none()); + + if let Some(tail) = p.tail { + set_next(tail, Some(task)); + } else { + p.head = Some(task); + } + + p.tail = Some(task); + + self.len.store(len + 1, Release); + } + + pub(super) fn push_batch( + &self, + batch_head: task::Notified<T>, + batch_tail: task::Notified<T>, + num: usize, + ) { + let batch_head = batch_head.into_raw(); + let batch_tail = batch_tail.into_raw(); + + debug_assert!(get_next(batch_tail).is_none()); + + let mut p = self.pointers.lock().unwrap(); + + if let Some(tail) = p.tail { + set_next(tail, Some(batch_head)); + } else { + p.head = Some(batch_head); + } + + p.tail = Some(batch_tail); + + // Increment the count. + // + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + let len = unsafe { self.len.unsync_load() }; + + self.len.store(len + num, Release); + } + + pub(super) fn pop(&self) -> Option<task::Notified<T>> { + // Fast path, if len == 0, then there are no values + if self.is_empty() { + return None; + } + + let mut p = self.pointers.lock().unwrap(); + + // It is possible to hit null here if another thread poped the last + // task between us checking `len` and acquiring the lock. + let task = p.head?; + + p.head = get_next(task); + + if p.head.is_none() { + p.tail = None; + } + + set_next(task, None); + + // Decrement the count. + // + // safety: All updates to the len atomic are guarded by the mutex. As + // such, a non-atomic load followed by a store is safe. + self.len + .store(unsafe { self.len.unsync_load() } - 1, Release); + + // safety: a `Notified` is pushed into the queue and now it is popped! + Some(unsafe { task::Notified::from_raw(task) }) + } +} + +impl<T: 'static> Drop for Inject<T> { + fn drop(&mut self) { + if !std::thread::panicking() { + assert!(self.pop().is_none(), "queue not empty"); + } + } +} + +fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> { + unsafe { header.as_ref().queue_next.with(|ptr| *ptr) } +} + +fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) { + unsafe { + header.as_ref().queue_next.with_mut(|ptr| *ptr = val); + } +} + +/// Split the head value into the real head and the index a stealer is working +/// on. +fn unpack(n: u32) -> (u16, u16) { + let real = n & u16::max_value() as u32; + let steal = n >> 16; + + (steal as u16, real as u16) +} + +/// Join the two head values +fn pack(steal: u16, real: u16) -> u32 { + (real as u32) | ((steal as u32) << 16) +} + +#[test] +fn test_local_queue_capacity() { + assert!(LOCAL_QUEUE_CAPACITY - 1 <= u8::max_value() as usize); +} diff --git a/third_party/rust/tokio/src/runtime/shell.rs b/third_party/rust/tokio/src/runtime/shell.rs new file mode 100644 index 0000000000..294f2a16d8 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/shell.rs @@ -0,0 +1,62 @@ +#![allow(clippy::redundant_clone)] + +use crate::park::{Park, Unpark}; +use crate::runtime::enter; +use crate::runtime::time; +use crate::util::{waker_ref, Wake}; + +use std::future::Future; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll::Ready; + +#[derive(Debug)] +pub(super) struct Shell { + driver: time::Driver, + + /// TODO: don't store this + unpark: Arc<Handle>, +} + +#[derive(Debug)] +struct Handle(<time::Driver as Park>::Unpark); + +impl Shell { + pub(super) fn new(driver: time::Driver) -> Shell { + let unpark = Arc::new(Handle(driver.unpark())); + + Shell { driver, unpark } + } + + pub(super) fn block_on<F>(&mut self, f: F) -> F::Output + where + F: Future, + { + let _e = enter(); + + pin!(f); + + let waker = waker_ref(&self.unpark); + let mut cx = Context::from_waker(&waker); + + loop { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + return v; + } + + self.driver.park().unwrap(); + } + } +} + +impl Wake for Handle { + /// Wake by value + fn wake(self: Arc<Self>) { + Wake::wake_by_ref(&self); + } + + /// Wake by reference + fn wake_by_ref(arc_self: &Arc<Self>) { + arc_self.0.unpark(); + } +} diff --git a/third_party/rust/tokio/src/runtime/spawner.rs b/third_party/rust/tokio/src/runtime/spawner.rs new file mode 100644 index 0000000000..d136945cdc --- /dev/null +++ b/third_party/rust/tokio/src/runtime/spawner.rs @@ -0,0 +1,37 @@ +cfg_rt_core! { + use crate::runtime::basic_scheduler; + use crate::task::JoinHandle; + + use std::future::Future; +} + +cfg_rt_threaded! { + use crate::runtime::thread_pool; +} + +#[derive(Debug, Clone)] +pub(crate) enum Spawner { + Shell, + #[cfg(feature = "rt-core")] + Basic(basic_scheduler::Spawner), + #[cfg(feature = "rt-threaded")] + ThreadPool(thread_pool::Spawner), +} + +cfg_rt_core! { + impl Spawner { + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Spawner::Shell => panic!("spawning not enabled for runtime"), + #[cfg(feature = "rt-core")] + Spawner::Basic(spawner) => spawner.spawn(future), + #[cfg(feature = "rt-threaded")] + Spawner::ThreadPool(spawner) => spawner.spawn(future), + } + } + } +} diff --git a/third_party/rust/tokio/src/runtime/task/core.rs b/third_party/rust/tokio/src/runtime/task/core.rs new file mode 100644 index 0000000000..573b9f3c9c --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/core.rs @@ -0,0 +1,279 @@ +use crate::loom::cell::UnsafeCell; +use crate::runtime::task::raw::{self, Vtable}; +use crate::runtime::task::state::State; +use crate::runtime::task::waker::waker_ref; +use crate::runtime::task::{Notified, Schedule, Task}; +use crate::util::linked_list; + +use std::future::Future; +use std::pin::Pin; +use std::ptr::NonNull; +use std::task::{Context, Poll, Waker}; + +/// The task cell. Contains the components of the task. +/// +/// It is critical for `Header` to be the first field as the task structure will +/// be referenced by both *mut Cell and *mut Header. +#[repr(C)] +pub(super) struct Cell<T: Future, S> { + /// Hot task state data + pub(super) header: Header, + + /// Either the future or output, depending on the execution stage. + pub(super) core: Core<T, S>, + + /// Cold data + pub(super) trailer: Trailer, +} + +/// The core of the task. +/// +/// Holds the future or output, depending on the stage of execution. +pub(super) struct Core<T: Future, S> { + /// Scheduler used to drive this future + pub(super) scheduler: UnsafeCell<Option<S>>, + + /// Either the future or the output + pub(super) stage: UnsafeCell<Stage<T>>, +} + +/// Crate public as this is also needed by the pool. +#[repr(C)] +pub(crate) struct Header { + /// Task state + pub(super) state: State, + + pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>, + + /// Pointer to next task, used with the injection queue + pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>, + + /// Pointer to the next task in the transfer stack + pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>, + + /// Table of function pointers for executing actions on the task. + pub(super) vtable: &'static Vtable, +} + +unsafe impl Send for Header {} +unsafe impl Sync for Header {} + +/// Cold data is stored after the future. +pub(super) struct Trailer { + /// Consumer task waiting on completion of this task. + pub(super) waker: UnsafeCell<Option<Waker>>, +} + +/// Either the future or the output. +pub(super) enum Stage<T: Future> { + Running(T), + Finished(super::Result<T::Output>), + Consumed, +} + +impl<T: Future, S: Schedule> Cell<T, S> { + /// Allocates a new task cell, containing the header, trailer, and core + /// structures. + pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> { + Box::new(Cell { + header: Header { + state, + owned: UnsafeCell::new(linked_list::Pointers::new()), + queue_next: UnsafeCell::new(None), + stack_next: UnsafeCell::new(None), + vtable: raw::vtable::<T, S>(), + }, + core: Core { + scheduler: UnsafeCell::new(None), + stage: UnsafeCell::new(Stage::Running(future)), + }, + trailer: Trailer { + waker: UnsafeCell::new(None), + }, + }) + } +} + +impl<T: Future, S: Schedule> Core<T, S> { + /// If needed, bind a scheduler to the task. + /// + /// This only happens on the first poll. + pub(super) fn bind_scheduler(&self, task: Task<S>) { + use std::mem::ManuallyDrop; + + // TODO: it would be nice to not have to wrap with a ManuallyDrop + let task = ManuallyDrop::new(task); + + // This function may be called concurrently, but the __first__ time it + // is called, the caller has unique access to this field. All subsequent + // concurrent calls will be via the `Waker`, which will "happens after" + // the first poll. + // + // In other words, it is always safe to read the field and it is safe to + // write to the field when it is `None`. + if self.is_bound() { + return; + } + + // Bind the task to the scheduler + let scheduler = S::bind(ManuallyDrop::into_inner(task)); + + // Safety: As `scheduler` is not set, this is the first poll + self.scheduler.with_mut(|ptr| unsafe { + *ptr = Some(scheduler); + }); + } + + /// Returns true if the task is bound to a scheduler. + pub(super) fn is_bound(&self) -> bool { + // Safety: never called concurrently w/ a mutation. + self.scheduler.with(|ptr| unsafe { (*ptr).is_some() }) + } + + /// Poll the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `state` field. This + /// requires ensuring mutal exclusion between any concurrent thread that + /// might modify the future or output field. + /// + /// The mutual exclusion is implemented by `Harness` and the `Lifecycle` + /// component of the task state. + /// + /// `self` must also be pinned. This is handled by storing the task on the + /// heap. + pub(super) fn poll(&self, header: &Header) -> Poll<T::Output> { + let res = { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutual exclusion to the field. + let future = match unsafe { &mut *ptr } { + Stage::Running(future) => future, + _ => unreachable!("unexpected stage"), + }; + + // Safety: The caller ensures the future is pinned. + let future = unsafe { Pin::new_unchecked(future) }; + + // The waker passed into the `poll` function does not require a ref + // count increment. + let waker_ref = waker_ref::<T, S>(header); + let mut cx = Context::from_waker(&*waker_ref); + + future.poll(&mut cx) + }) + }; + + if res.is_ready() { + self.drop_future_or_output(); + } + + res + } + + /// Drop the future + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn drop_future_or_output(&self) { + self.stage.with_mut(|ptr| { + // Safety: The caller ensures mutal exclusion to the field. + unsafe { *ptr = Stage::Consumed }; + }); + } + + /// Store the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn store_output(&self, output: super::Result<T::Output>) { + self.stage.with_mut(|ptr| { + // Safety: the caller ensures mutual exclusion to the field. + unsafe { *ptr = Stage::Finished(output) }; + }); + } + + /// Take the task output + /// + /// # Safety + /// + /// The caller must ensure it is safe to mutate the `stage` field. + pub(super) fn take_output(&self) -> super::Result<T::Output> { + use std::mem; + + self.stage.with_mut(|ptr| { + // Safety:: the caller ensures mutal exclusion to the field. + match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) { + Stage::Finished(output) => output, + _ => panic!("unexpected task state"), + } + }) + } + + /// Schedule the future for execution + pub(super) fn schedule(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.schedule(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Schedule the future for execution in the near future, yielding the + /// thread to other tasks. + pub(super) fn yield_now(&self, task: Notified<S>) { + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.yield_now(task), + None => panic!("no scheduler set"), + } + }); + } + + /// Release the task + /// + /// If the `Scheduler` implementation is able to, it returns the `Task` + /// handle immediately. The caller of this function will batch a ref-dec + /// with a state change. + pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> { + use std::mem::ManuallyDrop; + + let task = ManuallyDrop::new(task); + + self.scheduler.with(|ptr| { + // Safety: Can only be called after initial `poll`, which is the + // only time the field is mutated. + match unsafe { &*ptr } { + Some(scheduler) => scheduler.release(&*task), + // Task was never polled + None => None, + } + }) + } +} + +cfg_rt_threaded! { + impl Header { + pub(crate) fn shutdown(&self) { + use crate::runtime::task::RawTask; + + let task = unsafe { RawTask::from_raw(self.into()) }; + task.shutdown(); + } + } +} + +#[test] +#[cfg(not(loom))] +fn header_lte_cache_line() { + use std::mem::size_of; + + assert!(size_of::<Header>() <= 8 * size_of::<*const ()>()); +} diff --git a/third_party/rust/tokio/src/runtime/task/error.rs b/third_party/rust/tokio/src/runtime/task/error.rs new file mode 100644 index 0000000000..d5f65a4981 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/error.rs @@ -0,0 +1,163 @@ +use std::any::Any; +use std::fmt; +use std::io; +use std::sync::Mutex; + +doc_rt_core! { + /// Task failed to execute to completion. + pub struct JoinError { + repr: Repr, + } +} + +enum Repr { + Cancelled, + Panic(Mutex<Box<dyn Any + Send + 'static>>), +} + +impl JoinError { + #[doc(hidden)] + #[deprecated] + pub fn cancelled() -> JoinError { + Self::cancelled2() + } + + pub(crate) fn cancelled2() -> JoinError { + JoinError { + repr: Repr::Cancelled, + } + } + + #[doc(hidden)] + #[deprecated] + pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError { + Self::panic2(err) + } + + pub(crate) fn panic2(err: Box<dyn Any + Send + 'static>) -> JoinError { + JoinError { + repr: Repr::Panic(Mutex::new(err)), + } + } + + /// Returns true if the error was caused by the task being cancelled + pub fn is_cancelled(&self) -> bool { + match &self.repr { + Repr::Cancelled => true, + _ => false, + } + } + + /// Returns true if the error was caused by the task panicking + /// + /// # Examples + /// + /// ``` + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// assert!(err.is_panic()); + /// } + /// ``` + pub fn is_panic(&self) -> bool { + match &self.repr { + Repr::Panic(_) => true, + _ => false, + } + } + + /// Consumes the join error, returning the object with which the task panicked. + /// + /// # Panics + /// + /// `into_panic()` panics if the `Error` does not represent the underlying + /// task terminating with a panic. Use `is_panic` to check the error reason + /// or `try_into_panic` for a variant that does not panic. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if err.is_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(err.into_panic()); + /// } + /// } + /// ``` + pub fn into_panic(self) -> Box<dyn Any + Send + 'static> { + self.try_into_panic() + .expect("`JoinError` reason is not a panic.") + } + + /// Consumes the join error, returning the object with which the task + /// panicked if the task terminated due to a panic. Otherwise, `self` is + /// returned. + /// + /// # Examples + /// + /// ```should_panic + /// use std::panic; + /// + /// #[tokio::main] + /// async fn main() { + /// let err = tokio::spawn(async { + /// panic!("boom"); + /// }).await.unwrap_err(); + /// + /// if let Ok(reason) = err.try_into_panic() { + /// // Resume the panic on the main task + /// panic::resume_unwind(reason); + /// } + /// } + /// ``` + pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> { + match self.repr { + Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")), + _ => Err(self), + } + } +} + +impl fmt::Display for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "cancelled"), + Repr::Panic(_) => write!(fmt, "panic"), + } + } +} + +impl fmt::Debug for JoinError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.repr { + Repr::Cancelled => write!(fmt, "JoinError::Cancelled"), + Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"), + } + } +} + +impl std::error::Error for JoinError {} + +impl From<JoinError> for io::Error { + fn from(src: JoinError) -> io::Error { + io::Error::new( + io::ErrorKind::Other, + match src.repr { + Repr::Cancelled => "task was cancelled", + Repr::Panic(_) => "task panicked", + }, + ) + } +} diff --git a/third_party/rust/tokio/src/runtime/task/harness.rs b/third_party/rust/tokio/src/runtime/task/harness.rs new file mode 100644 index 0000000000..29b231ea88 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/harness.rs @@ -0,0 +1,372 @@ +use crate::runtime::task::core::{Cell, Core, Header, Trailer}; +use crate::runtime::task::state::Snapshot; +use crate::runtime::task::{JoinError, Notified, Schedule, Task}; + +use std::future::Future; +use std::mem; +use std::panic; +use std::ptr::NonNull; +use std::task::{Poll, Waker}; + +/// Typed raw task handle +pub(super) struct Harness<T: Future, S: 'static> { + cell: NonNull<Cell<T, S>>, +} + +impl<T, S> Harness<T, S> +where + T: Future, + S: 'static, +{ + pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> { + Harness { + cell: ptr.cast::<Cell<T, S>>(), + } + } + + fn header(&self) -> &Header { + unsafe { &self.cell.as_ref().header } + } + + fn trailer(&self) -> &Trailer { + unsafe { &self.cell.as_ref().trailer } + } + + fn core(&self) -> &Core<T, S> { + unsafe { &self.cell.as_ref().core } + } +} + +impl<T, S> Harness<T, S> +where + T: Future, + S: Schedule, +{ + /// Polls the inner future. + /// + /// All necessary state checks and transitions are performed. + /// + /// Panics raised while polling the future are handled. + pub(super) fn poll(self) { + // If this is the first time the task is polled, the task will be bound + // to the scheduler, in which case the task ref count must be + // incremented. + let ref_inc = !self.core().is_bound(); + + // Transition the task to the running state. + // + // A failure to transition here indicates the task has been cancelled + // while in the run queue pending execution. + let snapshot = match self.header().state.transition_to_running(ref_inc) { + Ok(snapshot) => snapshot, + Err(_) => { + // The task was shutdown while in the run queue. At this point, + // we just hold a ref counted reference. Drop it here. + self.drop_reference(); + return; + } + }; + + // Ensure the task is bound to a scheduler instance. If this is the + // first time polling the task, a scheduler instance is pulled from the + // local context and assigned to the task. + // + // The scheduler maintains ownership of the task and responds to `wake` + // calls. + // + // The task reference count has been incremented. + self.core().bind_scheduler(self.to_task()); + + // The transition to `Running` done above ensures that a lock on the + // future has been obtained. This also ensures the `*mut T` pointer + // contains the future (as opposed to the output) and is initialized. + + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + struct Guard<'a, T: Future, S: Schedule> { + core: &'a Core<T, S>, + polled: bool, + } + + impl<T: Future, S: Schedule> Drop for Guard<'_, T, S> { + fn drop(&mut self) { + if !self.polled { + self.core.drop_future_or_output(); + } + } + } + + let mut guard = Guard { + core: self.core(), + polled: false, + }; + + // If the task is cancelled, avoid polling it, instead signalling it + // is complete. + if snapshot.is_cancelled() { + Poll::Ready(Err(JoinError::cancelled2())) + } else { + let res = guard.core.poll(self.header()); + + // prevent the guard from dropping the future + guard.polled = true; + + res.map(Ok) + } + })); + + match res { + Ok(Poll::Ready(out)) => { + self.complete(out, snapshot.is_join_interested()); + } + Ok(Poll::Pending) => { + match self.header().state.transition_to_idle() { + Ok(snapshot) => { + if snapshot.is_notified() { + // Signal yield + self.core().yield_now(Notified(self.to_task())); + // The ref-count was incremented as part of + // `transition_to_idle`. + self.drop_reference(); + } + } + Err(_) => self.cancel_task(), + } + } + Err(err) => { + self.complete(Err(JoinError::panic2(err)), snapshot.is_join_interested()); + } + } + } + + pub(super) fn dealloc(self) { + // Release the join waker, if there is one. + self.trailer().waker.with_mut(|_| ()); + + // Check causality + self.core().stage.with_mut(|_| {}); + self.core().scheduler.with_mut(|_| {}); + + unsafe { + drop(Box::from_raw(self.cell.as_ptr())); + } + } + + // ===== join handle ===== + + /// Read the task output into `dst`. + pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) { + // Load a snapshot of the current task state + let snapshot = self.header().state.load(); + + debug_assert!(snapshot.is_join_interested()); + + if !snapshot.is_complete() { + // The waker must be stored in the task struct. + let res = if snapshot.has_join_waker() { + // There already is a waker stored in the struct. If it matches + // the provided waker, then there is no further work to do. + // Otherwise, the waker must be swapped. + let will_wake = unsafe { + // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE` + // may mutate the `waker` field. + self.trailer() + .waker + .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker)) + }; + + if will_wake { + // The task is not complete **and** the waker is up to date, + // there is nothing further that needs to be done. + return; + } + + // Unset the `JOIN_WAKER` to gain mutable access to the `waker` + // field then update the field with the new join worker. + // + // This requires two atomic operations, unsetting the bit and + // then resetting it. If the task transitions to complete + // concurrently to either one of those operations, then setting + // the join waker fails and we proceed to reading the task + // output. + self.header() + .state + .unset_waker() + .and_then(|snapshot| self.set_join_waker(waker.clone(), snapshot)) + } else { + self.set_join_waker(waker.clone(), snapshot) + }; + + match res { + Ok(_) => return, + Err(snapshot) => { + assert!(snapshot.is_complete()); + } + } + } + + *dst = Poll::Ready(self.core().take_output()); + } + + fn set_join_waker(&self, waker: Waker, snapshot: Snapshot) -> Result<Snapshot, Snapshot> { + assert!(snapshot.is_join_interested()); + assert!(!snapshot.has_join_waker()); + + // Safety: Only the `JoinHandle` may set the `waker` field. When + // `JOIN_INTEREST` is **not** set, nothing else will touch the field. + unsafe { + self.trailer().waker.with_mut(|ptr| { + *ptr = Some(waker); + }); + } + + // Update the `JoinWaker` state accordingly + let res = self.header().state.set_join_waker(); + + // If the state could not be updated, then clear the join waker + if res.is_err() { + unsafe { + self.trailer().waker.with_mut(|ptr| { + *ptr = None; + }); + } + } + + res + } + + pub(super) fn drop_join_handle_slow(self) { + // Try to unset `JOIN_INTEREST`. This must be done as a first step in + // case the task concurrently completed. + if self.header().state.unset_join_interested().is_err() { + // It is our responsibility to drop the output. This is critical as + // the task output may not be `Send` and as such must remain with + // the scheduler or `JoinHandle`. i.e. if the output remains in the + // task structure until the task is deallocated, it may be dropped + // by a Waker on any arbitrary thread. + self.core().drop_future_or_output(); + } + + // Drop the `JoinHandle` reference, possibly deallocating the task + self.drop_reference(); + } + + // ===== waker behavior ===== + + pub(super) fn wake_by_val(self) { + self.wake_by_ref(); + self.drop_reference(); + } + + pub(super) fn wake_by_ref(&self) { + if self.header().state.transition_to_notified() { + self.core().schedule(Notified(self.to_task())); + } + } + + pub(super) fn drop_reference(self) { + if self.header().state.ref_dec() { + self.dealloc(); + } + } + + /// Forcibly shutdown the task + /// + /// Attempt to transition to `Running` in order to forcibly shutdown the + /// task. If the task is currently running or in a state of completion, then + /// there is nothing further to do. When the task completes running, it will + /// notice the `CANCELLED` bit and finalize the task. + pub(super) fn shutdown(self) { + if !self.header().state.transition_to_shutdown() { + // The task is concurrently running. No further work needed. + return; + } + + // By transitioning the lifcycle to `Running`, we have permission to + // drop the future. + self.cancel_task(); + } + + // ====== internal ====== + + fn cancel_task(self) { + // Drop the future from a panic guard. + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().drop_future_or_output(); + })); + + if let Err(err) = res { + // Dropping the future panicked, complete the join + // handle with the panic to avoid dropping the panic + // on the ground. + self.complete(Err(JoinError::panic2(err)), true); + } else { + self.complete(Err(JoinError::cancelled2()), true); + } + } + + fn complete(mut self, output: super::Result<T::Output>, is_join_interested: bool) { + if is_join_interested { + // Store the output. The future has already been dropped + // + // Safety: Mutual exclusion is obtained by having transitioned the task + // state -> Running + self.core().store_output(output); + + // Transition to `Complete`, notifying the `JoinHandle` if necessary. + self.transition_to_complete(); + } + + // The task has completed execution and will no longer be scheduled. + // + // Attempts to batch a ref-dec with the state transition below. + let ref_dec = if self.core().is_bound() { + if let Some(task) = self.core().release(self.to_task()) { + mem::forget(task); + true + } else { + false + } + } else { + false + }; + + // This might deallocate + let snapshot = self + .header() + .state + .transition_to_terminal(!is_join_interested, ref_dec); + + if snapshot.ref_count() == 0 { + self.dealloc() + } + } + + /// Transitions the task's lifecycle to `Complete`. Notifies the + /// `JoinHandle` if it still has interest in the completion. + fn transition_to_complete(&mut self) { + // Transition the task's lifecycle to `Complete` and get a snapshot of + // the task's sate. + let snapshot = self.header().state.transition_to_complete(); + + if !snapshot.is_join_interested() { + // The `JoinHandle` is not interested in the output of this task. It + // is our responsibility to drop the output. + self.core().drop_future_or_output(); + } else if snapshot.has_join_waker() { + // Notify the join handle. The previous transition obtains the + // lock on the waker cell. + self.wake_join(); + } + } + + fn wake_join(&self) { + self.trailer().waker.with(|ptr| match unsafe { &*ptr } { + Some(waker) => waker.wake_by_ref(), + None => panic!("waker missing"), + }); + } + + fn to_task(&self) -> Task<S> { + unsafe { Task::from_raw(self.header().into()) } + } +} diff --git a/third_party/rust/tokio/src/runtime/task/join.rs b/third_party/rust/tokio/src/runtime/task/join.rs new file mode 100644 index 0000000000..fdcc346e5c --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/join.rs @@ -0,0 +1,152 @@ +use crate::runtime::task::RawTask; + +use std::fmt; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +doc_rt_core! { + /// An owned permission to join on a task (await its termination). + /// + /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for + /// a task rather than a thread. + /// + /// A `JoinHandle` *detaches* the associated task when it is dropped, which + /// means that there is no longer any handle to the task, and no way to `join` + /// on it. + /// + /// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`] + /// functions. + /// + /// # Examples + /// + /// Creation from [`task::spawn`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn(async { + /// // some work here + /// }); + /// # } + /// ``` + /// + /// Creation from [`task::spawn_blocking`]: + /// + /// ``` + /// use tokio::task; + /// + /// # async fn doc() { + /// let join_handle: task::JoinHandle<_> = task::spawn_blocking(|| { + /// // some blocking work here + /// }); + /// # } + /// ``` + /// + /// Child being detached and outliving its parent: + /// + /// ```no_run + /// use tokio::task; + /// use tokio::time; + /// use std::time::Duration; + /// + /// # #[tokio::main] async fn main() { + /// let original_task = task::spawn(async { + /// let _detached_task = task::spawn(async { + /// // Here we sleep to make sure that the first task returns before. + /// time::delay_for(Duration::from_millis(10)).await; + /// // This will be called, even though the JoinHandle is dropped. + /// println!("♫ Still alive ♫"); + /// }); + /// }); + /// + /// original_task.await.expect("The task being joined has panicked"); + /// println!("Original task is joined."); + /// + /// // We make sure that the new task has time to run, before the main + /// // task returns. + /// + /// time::delay_for(Duration::from_millis(1000)).await; + /// # } + /// ``` + /// + /// [`task::spawn`]: crate::task::spawn() + /// [`task::spawn_blocking`]: crate::task::spawn_blocking + /// [`std::thread::JoinHandle`]: std::thread::JoinHandle + pub struct JoinHandle<T> { + raw: Option<RawTask>, + _p: PhantomData<T>, + } +} + +unsafe impl<T: Send> Send for JoinHandle<T> {} +unsafe impl<T: Send> Sync for JoinHandle<T> {} + +impl<T> JoinHandle<T> { + pub(super) fn new(raw: RawTask) -> JoinHandle<T> { + JoinHandle { + raw: Some(raw), + _p: PhantomData, + } + } +} + +impl<T> Unpin for JoinHandle<T> {} + +impl<T> Future for JoinHandle<T> { + type Output = super::Result<T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut ret = Poll::Pending; + + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + // Raw should always be set. If it is not, this is due to polling after + // completion + let raw = self + .raw + .as_ref() + .expect("polling after `JoinHandle` already completed"); + + // Try to read the task output. If the task is not yet complete, the + // waker is stored and is notified once the task does complete. + // + // The function must go via the vtable, which requires erasing generic + // types. To do this, the function "return" is placed on the stack + // **before** calling the function and is passed into the function using + // `*mut ()`. + // + // Safety: + // + // The type of `T` must match the task's output type. + unsafe { + raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker()); + } + + ret + } +} + +impl<T> Drop for JoinHandle<T> { + fn drop(&mut self) { + if let Some(raw) = self.raw.take() { + if raw.header().state.drop_join_handle_fast().is_ok() { + return; + } + + raw.drop_join_handle_slow(); + } + } +} + +impl<T> fmt::Debug for JoinHandle<T> +where + T: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("JoinHandle").finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/task/mod.rs b/third_party/rust/tokio/src/runtime/task/mod.rs new file mode 100644 index 0000000000..17b5157e84 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/mod.rs @@ -0,0 +1,220 @@ +mod core; +use self::core::Cell; +pub(crate) use self::core::Header; + +mod error; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::error::JoinError; + +mod harness; +use self::harness::Harness; + +mod join; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::join::JoinHandle; + +mod raw; +use self::raw::RawTask; + +mod state; +use self::state::State; + +mod waker; + +cfg_rt_threaded! { + mod stack; + pub(crate) use self::stack::TransferStack; +} + +use crate::util::linked_list; + +use std::future::Future; +use std::marker::PhantomData; +use std::ptr::NonNull; +use std::{fmt, mem}; + +/// An owned handle to the task, tracked by ref count +#[repr(transparent)] +pub(crate) struct Task<S: 'static> { + raw: RawTask, + _p: PhantomData<S>, +} + +unsafe impl<S> Send for Task<S> {} +unsafe impl<S> Sync for Task<S> {} + +/// A task was notified +#[repr(transparent)] +pub(crate) struct Notified<S: 'static>(Task<S>); + +unsafe impl<S: Schedule> Send for Notified<S> {} +unsafe impl<S: Schedule> Sync for Notified<S> {} + +/// Task result sent back +pub(crate) type Result<T> = std::result::Result<T, JoinError>; + +pub(crate) trait Schedule: Sync + Sized + 'static { + /// Bind a task to the executor. + /// + /// Guaranteed to be called from the thread that called `poll` on the task. + /// The returned `Schedule` instance is associated with the task and is used + /// as `&self` in the other methods on this trait. + fn bind(task: Task<Self>) -> Self; + + /// The task has completed work and is ready to be released. The scheduler + /// is free to drop it whenever. + /// + /// If the scheduler can immediately release the task, it should return + /// it as part of the function. This enables the task module to batch + /// the ref-dec with other options. + fn release(&self, task: &Task<Self>) -> Option<Task<Self>>; + + /// Schedule the task + fn schedule(&self, task: Notified<Self>); + + /// Schedule the task to run in the near future, yielding the thread to + /// other tasks. + fn yield_now(&self, task: Notified<Self>) { + self.schedule(task); + } +} + +/// Create a new task with an associated join handle +pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) +where + T: Future + Send + 'static, + S: Schedule, +{ + let raw = RawTask::new::<_, S>(task); + + let task = Task { + raw, + _p: PhantomData, + }; + + let join = JoinHandle::new(raw); + + (Notified(task), join) +} + +cfg_rt_util! { + /// Create a new `!Send` task with an associated join handle + pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>) + where + T: Future + 'static, + S: Schedule, + { + let raw = RawTask::new::<_, S>(task); + + let task = Task { + raw, + _p: PhantomData, + }; + + let join = JoinHandle::new(raw); + + (Notified(task), join) + } +} + +impl<S: 'static> Task<S> { + pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { + Task { + raw: RawTask::from_raw(ptr), + _p: PhantomData, + } + } + + pub(crate) fn header(&self) -> &Header { + self.raw.header() + } +} + +cfg_rt_threaded! { + impl<S: 'static> Notified<S> { + pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> { + Notified(Task::from_raw(ptr)) + } + + pub(crate) fn header(&self) -> &Header { + self.0.header() + } + } + + impl<S: 'static> Task<S> { + pub(crate) fn into_raw(self) -> NonNull<Header> { + let ret = self.header().into(); + mem::forget(self); + ret + } + } + + impl<S: 'static> Notified<S> { + pub(crate) fn into_raw(self) -> NonNull<Header> { + self.0.into_raw() + } + } +} + +impl<S: Schedule> Task<S> { + /// Pre-emptively cancel the task as part of the shutdown process. + pub(crate) fn shutdown(&self) { + self.raw.shutdown(); + } +} + +impl<S: Schedule> Notified<S> { + /// Run the task + pub(crate) fn run(self) { + self.0.raw.poll(); + mem::forget(self); + } + + /// Pre-emptively cancel the task as part of the shutdown process. + pub(crate) fn shutdown(self) { + self.0.shutdown(); + } +} + +impl<S: 'static> Drop for Task<S> { + fn drop(&mut self) { + // Decrement the ref count + if self.header().state.ref_dec() { + // Deallocate if this is the final ref count + self.raw.dealloc(); + } + } +} + +impl<S> fmt::Debug for Task<S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "Task({:p})", self.header()) + } +} + +impl<S> fmt::Debug for Notified<S> { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "task::Notified({:p})", self.0.header()) + } +} + +/// # Safety +/// +/// Tasks are pinned +unsafe impl<S> linked_list::Link for Task<S> { + type Handle = Task<S>; + type Target = Header; + + fn as_raw(handle: &Task<S>) -> NonNull<Header> { + handle.header().into() + } + + unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> { + Task::from_raw(ptr) + } + + unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> { + // Not super great as it avoids some of looms checking... + NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr)) + } +} diff --git a/third_party/rust/tokio/src/runtime/task/raw.rs b/third_party/rust/tokio/src/runtime/task/raw.rs new file mode 100644 index 0000000000..cae56d037d --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/raw.rs @@ -0,0 +1,131 @@ +use crate::runtime::task::{Cell, Harness, Header, Schedule, State}; + +use std::future::Future; +use std::ptr::NonNull; +use std::task::{Poll, Waker}; + +/// Raw task handle +pub(super) struct RawTask { + ptr: NonNull<Header>, +} + +pub(super) struct Vtable { + /// Poll the future + pub(super) poll: unsafe fn(NonNull<Header>), + + /// Deallocate the memory + pub(super) dealloc: unsafe fn(NonNull<Header>), + + /// Read the task output, if complete + pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker), + + /// The join handle has been dropped + pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>), + + /// Scheduler is being shutdown + pub(super) shutdown: unsafe fn(NonNull<Header>), +} + +/// Get the vtable for the requested `T` and `S` generics. +pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable { + &Vtable { + poll: poll::<T, S>, + dealloc: dealloc::<T, S>, + try_read_output: try_read_output::<T, S>, + drop_join_handle_slow: drop_join_handle_slow::<T, S>, + shutdown: shutdown::<T, S>, + } +} + +impl RawTask { + pub(super) fn new<T, S>(task: T) -> RawTask + where + T: Future, + S: Schedule, + { + let ptr = Box::into_raw(Cell::<_, S>::new(task, State::new())); + let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) }; + + RawTask { ptr } + } + + pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask { + RawTask { ptr } + } + + /// Returns a reference to the task's meta structure. + /// + /// Safe as `Header` is `Sync`. + pub(super) fn header(&self) -> &Header { + unsafe { self.ptr.as_ref() } + } + + /// Safety: mutual exclusion is required to call this function. + pub(super) fn poll(self) { + let vtable = self.header().vtable; + unsafe { (vtable.poll)(self.ptr) } + } + + pub(super) fn dealloc(self) { + let vtable = self.header().vtable; + unsafe { + (vtable.dealloc)(self.ptr); + } + } + + /// Safety: `dst` must be a `*mut Poll<super::Result<T::Output>>` where `T` + /// is the future stored by the task. + pub(super) unsafe fn try_read_output(self, dst: *mut (), waker: &Waker) { + let vtable = self.header().vtable; + (vtable.try_read_output)(self.ptr, dst, waker); + } + + pub(super) fn drop_join_handle_slow(self) { + let vtable = self.header().vtable; + unsafe { (vtable.drop_join_handle_slow)(self.ptr) } + } + + pub(super) fn shutdown(self) { + let vtable = self.header().vtable; + unsafe { (vtable.shutdown)(self.ptr) } + } +} + +impl Clone for RawTask { + fn clone(&self) -> Self { + RawTask { ptr: self.ptr } + } +} + +impl Copy for RawTask {} + +unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) { + let harness = Harness::<T, S>::from_raw(ptr); + harness.poll(); +} + +unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) { + let harness = Harness::<T, S>::from_raw(ptr); + harness.dealloc(); +} + +unsafe fn try_read_output<T: Future, S: Schedule>( + ptr: NonNull<Header>, + dst: *mut (), + waker: &Waker, +) { + let out = &mut *(dst as *mut Poll<super::Result<T::Output>>); + + let harness = Harness::<T, S>::from_raw(ptr); + harness.try_read_output(out, waker); +} + +unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) { + let harness = Harness::<T, S>::from_raw(ptr); + harness.drop_join_handle_slow() +} + +unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) { + let harness = Harness::<T, S>::from_raw(ptr); + harness.shutdown() +} diff --git a/third_party/rust/tokio/src/runtime/task/stack.rs b/third_party/rust/tokio/src/runtime/task/stack.rs new file mode 100644 index 0000000000..9dd8d3f43f --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/stack.rs @@ -0,0 +1,83 @@ +use crate::loom::sync::atomic::AtomicPtr; +use crate::runtime::task::{Header, Task}; + +use std::marker::PhantomData; +use std::ptr::{self, NonNull}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +/// Concurrent stack of tasks, used to pass ownership of a task from one worker +/// to another. +pub(crate) struct TransferStack<T: 'static> { + head: AtomicPtr<Header>, + _p: PhantomData<T>, +} + +impl<T: 'static> TransferStack<T> { + pub(crate) fn new() -> TransferStack<T> { + TransferStack { + head: AtomicPtr::new(ptr::null_mut()), + _p: PhantomData, + } + } + + pub(crate) fn push(&self, task: Task<T>) { + let task = task.into_raw(); + + // We don't care about any memory associated w/ setting the `head` + // field, just the current value. + // + // The compare-exchange creates a release sequence. + let mut curr = self.head.load(Relaxed); + + loop { + unsafe { + task.as_ref() + .stack_next + .with_mut(|ptr| *ptr = NonNull::new(curr)) + }; + + let res = self + .head + .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed); + + match res { + Ok(_) => return, + Err(actual) => { + curr = actual; + } + } + } + } + + pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> { + struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>); + + impl<T: 'static> Iterator for Iter<T> { + type Item = Task<T>; + + fn next(&mut self) -> Option<Task<T>> { + let task = self.0?; + + // Move the cursor forward + self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) }; + + // Return the task + unsafe { Some(Task::from_raw(task)) } + } + } + + impl<T: 'static> Drop for Iter<T> { + fn drop(&mut self) { + use std::process; + + if self.0.is_some() { + // we have bugs + process::abort(); + } + } + } + + let ptr = self.head.swap(ptr::null_mut(), Acquire); + Iter(NonNull::new(ptr), PhantomData) + } +} diff --git a/third_party/rust/tokio/src/runtime/task/state.rs b/third_party/rust/tokio/src/runtime/task/state.rs new file mode 100644 index 0000000000..21e90430db --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/state.rs @@ -0,0 +1,446 @@ +use crate::loom::sync::atomic::AtomicUsize; + +use std::fmt; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::usize; + +pub(super) struct State { + val: AtomicUsize, +} + +/// Current state value +#[derive(Copy, Clone)] +pub(super) struct Snapshot(usize); + +type UpdateResult = Result<Snapshot, Snapshot>; + +/// The task is currently being run. +const RUNNING: usize = 0b0001; + +/// The task is complete. +/// +/// Once this bit is set, it is never unset +const COMPLETE: usize = 0b0010; + +/// Extracts the task's lifecycle value from the state +const LIFECYCLE_MASK: usize = 0b11; + +/// Flag tracking if the task has been pushed into a run queue. +const NOTIFIED: usize = 0b100; + +/// The join handle is still around +const JOIN_INTEREST: usize = 0b1_000; + +/// A join handle waker has been set +const JOIN_WAKER: usize = 0b10_000; + +/// The task has been forcibly cancelled. +const CANCELLED: usize = 0b100_000; + +/// All bits +const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; + +/// Bits used by the ref count portion of the state. +const REF_COUNT_MASK: usize = !STATE_MASK; + +/// Number of positions to shift the ref count +const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; + +/// One ref count +const REF_ONE: usize = 1 << REF_COUNT_SHIFT; + +/// State a task is initialized with +/// +/// A task is initialized with two references: one for the scheduler and one for +/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTERST` is +/// set. A new task is immediately pushed into the run queue for execution and +/// starts with the `NOTIFIED` flag set. +const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; + +/// All transitions are performed via RMW operations. This establishes an +/// unambiguous modification order. +impl State { + /// Return a task's initial state + pub(super) fn new() -> State { + // A task is initialized with three references: one for the scheduler, + // one for the `JoinHandle`, one for the task handle made available in + // release. As the task starts with a `JoinHandle`, `JOIN_INTERST` is + // set. A new task is immediately pushed into the run queue for + // execution and starts with the `NOTIFIED` flag set. + State { + val: AtomicUsize::new(INITIAL_STATE), + } + } + + /// Loads the current state, establishes `Acquire` ordering. + pub(super) fn load(&self) -> Snapshot { + Snapshot(self.val.load(Acquire)) + } + + /// Attempt to transition the lifecycle to `Running`. + /// + /// If `ref_inc` is set, the reference count is also incremented. + /// + /// The `NOTIFIED` bit is always unset. + pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_notified()); + + let mut next = curr; + + if !next.is_idle() { + return None; + } + + if ref_inc { + next.ref_inc(); + } + + next.set_running(); + next.unset_notified(); + Some(next) + }) + } + + /// Transitions the task from `Running` -> `Idle`. + /// + /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise. + /// In both cases, a snapshot of the state from **after** the transition is + /// returned. + /// + /// The transition to `Idle` fails if the task has been flagged to be + /// cancelled. + pub(super) fn transition_to_idle(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_running()); + + if curr.is_cancelled() { + return None; + } + + let mut next = curr; + next.unset_running(); + + if next.is_notified() { + // The caller needs to schedule the task. To do this, it needs a + // waker. The waker requires a ref count. + next.ref_inc(); + } + + Some(next) + }) + } + + /// Transitions the task from `Running` -> `Complete`. + pub(super) fn transition_to_complete(&self) -> Snapshot { + const DELTA: usize = RUNNING | COMPLETE; + + let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); + assert!(prev.is_running()); + assert!(!prev.is_complete()); + + Snapshot(prev.0 ^ DELTA) + } + + /// Transition from `Complete` -> `Terminal`, decrementing the reference + /// count by 1. + /// + /// When `ref_dec` is set, an additional ref count decrement is performed. + /// This is used to batch atomic ops when possible. + pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot { + self.fetch_update(|mut snapshot| { + if complete { + snapshot.set_complete(); + } else { + assert!(snapshot.is_complete()); + } + + // Decrement the primary handle + snapshot.ref_dec(); + + if ref_dec { + // Decrement a second time + snapshot.ref_dec(); + } + + Some(snapshot) + }) + .unwrap() + } + + /// Transitions the state to `NOTIFIED`. + /// + /// Returns `true` if the task needs to be submitted to the pool for + /// execution + pub(super) fn transition_to_notified(&self) -> bool { + let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel)); + prev.will_need_queueing() + } + + /// Set the `CANCELLED` bit and attempt to transition to `Running`. + /// + /// Returns `true` if the transition to `Running` succeeded. + pub(super) fn transition_to_shutdown(&self) -> bool { + let mut prev = Snapshot(0); + + let _ = self.fetch_update(|mut snapshot| { + prev = snapshot; + + if snapshot.is_idle() { + snapshot.set_running(); + + if snapshot.is_notified() { + // If the task is idle and notified, this indicates the task is + // in the run queue and is considered owned by the scheduler. + // The shutdown operation claims ownership of the task, which + // means we need to assign an additional ref-count to the task + // in the queue. + snapshot.ref_inc(); + } + } + + snapshot.set_cancelled(); + Some(snapshot) + }); + + prev.is_idle() + } + + /// Optimistically tries to swap the state assuming the join handle is + /// __immediately__ dropped on spawn + pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { + use std::sync::atomic::Ordering::Relaxed; + + // Relaxed is acceptable as if this function is called and succeeds, + // then nothing has been done w/ the join handle. + // + // The moment the join handle is used (polled), the `JOIN_WAKER` flag is + // set, at which point the CAS will fail. + // + // Given this, there is no risk if this operation is reordered. + self.val + .compare_exchange_weak( + INITIAL_STATE, + (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, + Release, + Relaxed, + ) + .map(|_| ()) + .map_err(|_| ()) + } + + /// Try to unset the JOIN_INTEREST flag. + /// + /// Returns `Ok` if the operation happens before the task transitions to a + /// completed state, `Err` otherwise. + pub(super) fn unset_join_interested(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_join_interested()); + + if curr.is_complete() { + return None; + } + + let mut next = curr; + next.unset_join_interested(); + + Some(next) + }) + } + + /// Set the `JOIN_WAKER` bit. + /// + /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if + /// the task has completed. + pub(super) fn set_join_waker(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_join_interested()); + assert!(!curr.has_join_waker()); + + if curr.is_complete() { + return None; + } + + let mut next = curr; + next.set_join_waker(); + + Some(next) + }) + } + + /// Unsets the `JOIN_WAKER` bit. + /// + /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if + /// the task has completed. + pub(super) fn unset_waker(&self) -> UpdateResult { + self.fetch_update(|curr| { + assert!(curr.is_join_interested()); + assert!(curr.has_join_waker()); + + if curr.is_complete() { + return None; + } + + let mut next = curr; + next.unset_join_waker(); + + Some(next) + }) + } + + pub(super) fn ref_inc(&self) { + use std::process; + use std::sync::atomic::Ordering::Relaxed; + + // Using a relaxed ordering is alright here, as knowledge of the + // original reference prevents other threads from erroneously deleting + // the object. + // + // As explained in the [Boost documentation][1], Increasing the + // reference counter can always be done with memory_order_relaxed: New + // references to an object can only be formed from an existing + // reference, and passing an existing reference from one thread to + // another must already provide any required synchronization. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + let prev = self.val.fetch_add(REF_ONE, Relaxed); + + // If the reference count overflowed, abort. + if prev > isize::max_value() as usize { + process::abort(); + } + } + + /// Returns `true` if the task should be released. + pub(super) fn ref_dec(&self) -> bool { + let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); + prev.ref_count() == 1 + } + + fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> + where + F: FnMut(Snapshot) -> Option<Snapshot>, + { + let mut curr = self.load(); + + loop { + let next = match f(curr) { + Some(next) => next, + None => return Err(curr), + }; + + let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); + + match res { + Ok(_) => return Ok(next), + Err(actual) => curr = Snapshot(actual), + } + } + } +} + +// ===== impl Snapshot ===== + +impl Snapshot { + /// Returns `true` if the task is in an idle state. + pub(super) fn is_idle(self) -> bool { + self.0 & (RUNNING | COMPLETE) == 0 + } + + /// Returns `true` if the task has been flagged as notified. + pub(super) fn is_notified(self) -> bool { + self.0 & NOTIFIED == NOTIFIED + } + + fn unset_notified(&mut self) { + self.0 &= !NOTIFIED + } + + pub(super) fn is_running(self) -> bool { + self.0 & RUNNING == RUNNING + } + + fn set_running(&mut self) { + self.0 |= RUNNING; + } + + fn unset_running(&mut self) { + self.0 &= !RUNNING; + } + + pub(super) fn is_cancelled(self) -> bool { + self.0 & CANCELLED == CANCELLED + } + + fn set_cancelled(&mut self) { + self.0 |= CANCELLED; + } + + fn set_complete(&mut self) { + self.0 |= COMPLETE; + } + + /// Returns `true` if the task's future has completed execution. + pub(super) fn is_complete(self) -> bool { + self.0 & COMPLETE == COMPLETE + } + + pub(super) fn is_join_interested(self) -> bool { + self.0 & JOIN_INTEREST == JOIN_INTEREST + } + + fn unset_join_interested(&mut self) { + self.0 &= !JOIN_INTEREST + } + + pub(super) fn has_join_waker(self) -> bool { + self.0 & JOIN_WAKER == JOIN_WAKER + } + + fn set_join_waker(&mut self) { + self.0 |= JOIN_WAKER; + } + + fn unset_join_waker(&mut self) { + self.0 &= !JOIN_WAKER + } + + pub(super) fn ref_count(self) -> usize { + (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT + } + + fn ref_inc(&mut self) { + assert!(self.0 <= isize::max_value() as usize); + self.0 += REF_ONE; + } + + pub(super) fn ref_dec(&mut self) { + assert!(self.ref_count() > 0); + self.0 -= REF_ONE + } + + fn will_need_queueing(self) -> bool { + !self.is_notified() && self.is_idle() + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let snapshot = self.load(); + snapshot.fmt(fmt) + } +} + +impl fmt::Debug for Snapshot { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Snapshot") + .field("is_running", &self.is_running()) + .field("is_complete", &self.is_complete()) + .field("is_notified", &self.is_notified()) + .field("is_cancelled", &self.is_cancelled()) + .field("is_join_interested", &self.is_join_interested()) + .field("has_join_waker", &self.has_join_waker()) + .field("ref_count", &self.ref_count()) + .finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/task/waker.rs b/third_party/rust/tokio/src/runtime/task/waker.rs new file mode 100644 index 0000000000..5c2d478fbb --- /dev/null +++ b/third_party/rust/tokio/src/runtime/task/waker.rs @@ -0,0 +1,101 @@ +use crate::runtime::task::harness::Harness; +use crate::runtime::task::{Header, Schedule}; + +use std::future::Future; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; +use std::ops; +use std::ptr::NonNull; +use std::task::{RawWaker, RawWakerVTable, Waker}; + +pub(super) struct WakerRef<'a, S: 'static> { + waker: ManuallyDrop<Waker>, + _p: PhantomData<(&'a Header, S)>, +} + +/// Returns a `WakerRef` which avoids having to pre-emptively increase the +/// refcount if there is no need to do so. +pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S> +where + T: Future, + S: Schedule, +{ + // `Waker::will_wake` uses the VTABLE pointer as part of the check. This + // means that `will_wake` will always return false when using the current + // task's waker. (discussion at rust-lang/rust#66281). + // + // To fix this, we use a single vtable. Since we pass in a reference at this + // point and not an *owned* waker, we must ensure that `drop` is never + // called on this waker instance. This is done by wrapping it with + // `ManuallyDrop` and then never calling drop. + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) }; + + WakerRef { + waker, + _p: PhantomData, + } +} + +impl<S> ops::Deref for WakerRef<'_, S> { + type Target = Waker; + + fn deref(&self) -> &Waker { + &self.waker + } +} + +unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker +where + T: Future, + S: Schedule, +{ + let header = ptr as *const Header; + (*header).state.ref_inc(); + raw_waker::<T, S>(header) +} + +unsafe fn drop_waker<T, S>(ptr: *const ()) +where + T: Future, + S: Schedule, +{ + let ptr = NonNull::new_unchecked(ptr as *mut Header); + let harness = Harness::<T, S>::from_raw(ptr); + harness.drop_reference(); +} + +unsafe fn wake_by_val<T, S>(ptr: *const ()) +where + T: Future, + S: Schedule, +{ + let ptr = NonNull::new_unchecked(ptr as *mut Header); + let harness = Harness::<T, S>::from_raw(ptr); + harness.wake_by_val(); +} + +// Wake without consuming the waker +unsafe fn wake_by_ref<T, S>(ptr: *const ()) +where + T: Future, + S: Schedule, +{ + let ptr = NonNull::new_unchecked(ptr as *mut Header); + let harness = Harness::<T, S>::from_raw(ptr); + harness.wake_by_ref(); +} + +fn raw_waker<T, S>(header: *const Header) -> RawWaker +where + T: Future, + S: Schedule, +{ + let ptr = header as *const (); + let vtable = &RawWakerVTable::new( + clone_waker::<T, S>, + wake_by_val::<T, S>, + wake_by_ref::<T, S>, + drop_waker::<T, S>, + ); + RawWaker::new(ptr, vtable) +} diff --git a/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs new file mode 100644 index 0000000000..db7048e3f9 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs @@ -0,0 +1,31 @@ +use crate::runtime::{self, Runtime}; + +use std::sync::Arc; + +#[test] +fn blocking_shutdown() { + loom::model(|| { + let v = Arc::new(()); + + let rt = mk_runtime(1); + rt.enter(|| { + for _ in 0..2 { + let v = v.clone(); + crate::task::spawn_blocking(move || { + assert!(1 < Arc::strong_count(&v)); + }); + } + }); + + drop(rt); + assert_eq!(1, Arc::strong_count(&v)); + }); +} + +fn mk_runtime(num_threads: usize) -> Runtime { + runtime::Builder::new() + .threaded_scheduler() + .core_threads(num_threads) + .build() + .unwrap() +} diff --git a/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs new file mode 100644 index 0000000000..c126fe479a --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs @@ -0,0 +1,49 @@ +use loom::sync::Notify; + +use std::sync::{Arc, Mutex}; + +pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Inner { + notify: Notify::new(), + value: Mutex::new(None), + }); + + let tx = Sender { + inner: inner.clone(), + }; + let rx = Receiver { inner }; + + (tx, rx) +} + +pub(crate) struct Sender<T> { + inner: Arc<Inner<T>>, +} + +pub(crate) struct Receiver<T> { + inner: Arc<Inner<T>>, +} + +struct Inner<T> { + notify: Notify, + value: Mutex<Option<T>>, +} + +impl<T> Sender<T> { + pub(crate) fn send(self, value: T) { + *self.inner.value.lock().unwrap() = Some(value); + self.inner.notify.notify(); + } +} + +impl<T> Receiver<T> { + pub(crate) fn recv(self) -> T { + loop { + if let Some(v) = self.inner.value.lock().unwrap().take() { + return v; + } + + self.inner.notify.wait(); + } + } +} diff --git a/third_party/rust/tokio/src/runtime/tests/loom_pool.rs b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs new file mode 100644 index 0000000000..c08658cde8 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs @@ -0,0 +1,380 @@ +/// Full runtime loom tests. These are heavy tests and take significant time to +/// run on CI. +/// +/// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test. +/// +/// In order to speed up the C +use crate::future::poll_fn; +use crate::runtime::tests::loom_oneshot as oneshot; +use crate::runtime::{self, Runtime}; +use crate::{spawn, task}; +use tokio_test::assert_ok; + +use loom::sync::atomic::{AtomicBool, AtomicUsize}; +use loom::sync::{Arc, Mutex}; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::task::{Context, Poll}; + +/// Tests are divided into groups to make the runs faster on CI. +mod group_a { + use super::*; + + #[test] + fn racy_shutdown() { + loom::model(|| { + let pool = mk_pool(1); + + // here's the case we want to exercise: + // + // a worker that still has tasks in its local queue gets sent to the blocking pool (due to + // block_in_place). the blocking pool is shut down, so drops the worker. the worker's + // shutdown method never gets run. + // + // we do this by spawning two tasks on one worker, the first of which does block_in_place, + // and then immediately drop the pool. + + pool.spawn(track(async { + crate::task::block_in_place(|| {}); + })); + pool.spawn(track(async {})); + drop(pool); + }); + } + + #[test] + fn pool_multi_spawn() { + loom::model(|| { + let pool = mk_pool(2); + let c1 = Arc::new(AtomicUsize::new(0)); + + let (tx, rx) = oneshot::channel(); + let tx1 = Arc::new(Mutex::new(Some(tx))); + + // Spawn a task + let c2 = c1.clone(); + let tx2 = tx1.clone(); + pool.spawn(track(async move { + spawn(track(async move { + if 1 == c1.fetch_add(1, Relaxed) { + tx1.lock().unwrap().take().unwrap().send(()); + } + })); + })); + + // Spawn a second task + pool.spawn(track(async move { + spawn(track(async move { + if 1 == c2.fetch_add(1, Relaxed) { + tx2.lock().unwrap().take().unwrap().send(()); + } + })); + })); + + rx.recv(); + }); + } + + fn only_blocking_inner(first_pending: bool) { + loom::model(move || { + let pool = mk_pool(1); + let (block_tx, block_rx) = oneshot::channel(); + + pool.spawn(track(async move { + crate::task::block_in_place(move || { + block_tx.send(()); + }); + if first_pending { + task::yield_now().await + } + })); + + block_rx.recv(); + drop(pool); + }); + } + + #[test] + fn only_blocking_without_pending() { + only_blocking_inner(false) + } + + #[test] + fn only_blocking_with_pending() { + only_blocking_inner(true) + } +} + +mod group_b { + use super::*; + + fn blocking_and_regular_inner(first_pending: bool) { + const NUM: usize = 3; + loom::model(move || { + let pool = mk_pool(1); + let cnt = Arc::new(AtomicUsize::new(0)); + + let (block_tx, block_rx) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); + let done_tx = Arc::new(Mutex::new(Some(done_tx))); + + pool.spawn(track(async move { + crate::task::block_in_place(move || { + block_tx.send(()); + }); + if first_pending { + task::yield_now().await + } + })); + + for _ in 0..NUM { + let cnt = cnt.clone(); + let done_tx = done_tx.clone(); + + pool.spawn(track(async move { + if NUM == cnt.fetch_add(1, Relaxed) + 1 { + done_tx.lock().unwrap().take().unwrap().send(()); + } + })); + } + + done_rx.recv(); + block_rx.recv(); + + drop(pool); + }); + } + + #[test] + fn blocking_and_regular() { + blocking_and_regular_inner(false); + } + + #[test] + fn blocking_and_regular_with_pending() { + blocking_and_regular_inner(true); + } + + #[test] + fn pool_shutdown() { + loom::model(|| { + let pool = mk_pool(2); + + pool.spawn(track(async move { + gated2(true).await; + })); + + pool.spawn(track(async move { + gated2(false).await; + })); + + drop(pool); + }); + } + + #[test] + fn join_output() { + loom::model(|| { + let mut rt = mk_pool(1); + + rt.block_on(async { + let t = crate::spawn(track(async { "hello" })); + + let out = assert_ok!(t.await); + assert_eq!("hello", out.into_inner()); + }); + }); + } + + #[test] + fn poll_drop_handle_then_drop() { + loom::model(|| { + let mut rt = mk_pool(1); + + rt.block_on(async move { + let mut t = crate::spawn(track(async { "hello" })); + + poll_fn(|cx| { + let _ = Pin::new(&mut t).poll(cx); + Poll::Ready(()) + }) + .await; + }); + }) + } + + #[test] + fn complete_block_on_under_load() { + loom::model(|| { + let mut pool = mk_pool(1); + + pool.block_on(async { + // Trigger a re-schedule + crate::spawn(track(async { + for _ in 0..2 { + task::yield_now().await; + } + })); + + gated2(true).await + }); + }); + } +} + +mod group_c { + use super::*; + + #[test] + fn shutdown_with_notification() { + use crate::sync::oneshot; + + loom::model(|| { + let rt = mk_pool(2); + let (done_tx, done_rx) = oneshot::channel::<()>(); + + rt.spawn(track(async move { + let (tx, rx) = oneshot::channel::<()>(); + + crate::spawn(async move { + crate::task::spawn_blocking(move || { + let _ = tx.send(()); + }); + + let _ = done_rx.await; + }); + + let _ = rx.await; + + let _ = done_tx.send(()); + })); + }); + } +} + +mod group_d { + use super::*; + + #[test] + fn pool_multi_notify() { + loom::model(|| { + let pool = mk_pool(2); + + let c1 = Arc::new(AtomicUsize::new(0)); + + let (done_tx, done_rx) = oneshot::channel(); + let done_tx1 = Arc::new(Mutex::new(Some(done_tx))); + + // Spawn a task + let c2 = c1.clone(); + let done_tx2 = done_tx1.clone(); + pool.spawn(track(async move { + gated().await; + gated().await; + + if 1 == c1.fetch_add(1, Relaxed) { + done_tx1.lock().unwrap().take().unwrap().send(()); + } + })); + + // Spawn a second task + pool.spawn(track(async move { + gated().await; + gated().await; + + if 1 == c2.fetch_add(1, Relaxed) { + done_tx2.lock().unwrap().take().unwrap().send(()); + } + })); + + done_rx.recv(); + }); + } +} + +fn mk_pool(num_threads: usize) -> Runtime { + runtime::Builder::new() + .threaded_scheduler() + .core_threads(num_threads) + .build() + .unwrap() +} + +fn gated() -> impl Future<Output = &'static str> { + gated2(false) +} + +fn gated2(thread: bool) -> impl Future<Output = &'static str> { + use loom::thread; + use std::sync::Arc; + + let gate = Arc::new(AtomicBool::new(false)); + let mut fired = false; + + poll_fn(move |cx| { + if !fired { + let gate = gate.clone(); + let waker = cx.waker().clone(); + + if thread { + thread::spawn(move || { + gate.store(true, SeqCst); + waker.wake_by_ref(); + }); + } else { + spawn(track(async move { + gate.store(true, SeqCst); + waker.wake_by_ref(); + })); + } + + fired = true; + + return Poll::Pending; + } + + if gate.load(SeqCst) { + Poll::Ready("hello world") + } else { + Poll::Pending + } + }) +} + +fn track<T: Future>(f: T) -> Track<T> { + Track { + inner: f, + arc: Arc::new(()), + } +} + +pin_project! { + struct Track<T> { + #[pin] + inner: T, + // Arc is used to hook into loom's leak tracking. + arc: Arc<()>, + } +} + +impl<T> Track<T> { + fn into_inner(self) -> T { + self.inner + } +} + +impl<T: Future> Future for Track<T> { + type Output = Track<T::Output>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + + Poll::Ready(Track { + inner: ready!(me.inner.poll(cx)), + arc: me.arc.clone(), + }) + } +} diff --git a/third_party/rust/tokio/src/runtime/tests/loom_queue.rs b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs new file mode 100644 index 0000000000..de02610db0 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs @@ -0,0 +1,216 @@ +use crate::runtime::queue; +use crate::runtime::task::{self, Schedule, Task}; + +use loom::thread; + +#[test] +fn basic() { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + let th = thread::spawn(move || { + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..3 { + if steal.steal_into(&mut local).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + } + + n + }); + + let mut n = 0; + + for _ in 0..2 { + for _ in 0..2 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + if local.pop().is_some() { + n += 1; + } + + // Push another task + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + + while local.pop().is_some() { + n += 1; + } + } + + while inject.pop().is_some() { + n += 1; + } + + n += th.join().unwrap(); + + assert_eq!(6, n); + }); +} + +#[test] +fn steal_overflow() { + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + let th = thread::spawn(move || { + let (_, mut local) = queue::local(); + let mut n = 0; + + if steal.steal_into(&mut local).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + n + }); + + let mut n = 0; + + // push a task, pop a task + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + + if local.pop().is_some() { + n += 1; + } + + for _ in 0..6 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + n += th.join().unwrap(); + + while local.pop().is_some() { + n += 1; + } + + while inject.pop().is_some() { + n += 1; + } + + assert_eq!(7, n); + }); +} + +#[test] +fn multi_stealer() { + const NUM_TASKS: usize = 5; + + fn steal_tasks(steal: queue::Steal<Runtime>) -> usize { + let (_, mut local) = queue::local(); + + if steal.steal_into(&mut local).is_none() { + return 0; + } + + let mut n = 1; + + while local.pop().is_some() { + n += 1; + } + + n + } + + loom::model(|| { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + // Push work + for _ in 0..NUM_TASKS { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + let th1 = { + let steal = steal.clone(); + thread::spawn(move || steal_tasks(steal)) + }; + + let th2 = thread::spawn(move || steal_tasks(steal)); + + let mut n = 0; + + while local.pop().is_some() { + n += 1; + } + + while inject.pop().is_some() { + n += 1; + } + + n += th1.join().unwrap(); + n += th2.join().unwrap(); + + assert_eq!(n, NUM_TASKS); + }); +} + +#[test] +fn chained_steal() { + loom::model(|| { + let (s1, mut l1) = queue::local(); + let (s2, mut l2) = queue::local(); + let inject = queue::Inject::new(); + + // Load up some tasks + for _ in 0..4 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + l1.push_back(task, &inject); + + let (task, _) = task::joinable::<_, Runtime>(async {}); + l2.push_back(task, &inject); + } + + // Spawn a task to steal from **our** queue + let th = thread::spawn(move || { + let (_, mut local) = queue::local(); + s1.steal_into(&mut local); + + while local.pop().is_some() {} + }); + + // Drain our tasks, then attempt to steal + while l1.pop().is_some() {} + + s2.steal_into(&mut l1); + + th.join().unwrap(); + + while l1.pop().is_some() {} + while l2.pop().is_some() {} + while inject.pop().is_some() {} + }); +} + +struct Runtime; + +impl Schedule for Runtime { + fn bind(task: Task<Self>) -> Runtime { + std::mem::forget(task); + Runtime + } + + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } +} diff --git a/third_party/rust/tokio/src/runtime/tests/mod.rs b/third_party/rust/tokio/src/runtime/tests/mod.rs new file mode 100644 index 0000000000..123a7e35a3 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/mod.rs @@ -0,0 +1,13 @@ +cfg_loom! { + mod loom_blocking; + mod loom_oneshot; + mod loom_pool; + mod loom_queue; +} + +cfg_not_loom! { + mod queue; + + #[cfg(miri)] + mod task; +} diff --git a/third_party/rust/tokio/src/runtime/tests/queue.rs b/third_party/rust/tokio/src/runtime/tests/queue.rs new file mode 100644 index 0000000000..d228d5dcc7 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/queue.rs @@ -0,0 +1,202 @@ +use crate::runtime::queue; +use crate::runtime::task::{self, Schedule, Task}; + +use std::thread; +use std::time::Duration; + +#[test] +fn fits_256() { + let (_, mut local) = queue::local(); + let inject = queue::Inject::new(); + + for _ in 0..256 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + assert!(inject.pop().is_none()); + + while local.pop().is_some() {} +} + +#[test] +fn overflow() { + let (_, mut local) = queue::local(); + let inject = queue::Inject::new(); + + for _ in 0..257 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + let mut n = 0; + + while inject.pop().is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + assert_eq!(n, 257); +} + +#[test] +fn steal_batch() { + let (steal1, mut local1) = queue::local(); + let (_, mut local2) = queue::local(); + let inject = queue::Inject::new(); + + for _ in 0..4 { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local1.push_back(task, &inject); + } + + assert!(steal1.steal_into(&mut local2).is_some()); + + for _ in 0..1 { + assert!(local2.pop().is_some()); + } + + assert!(local2.pop().is_none()); + + for _ in 0..2 { + assert!(local1.pop().is_some()); + } + + assert!(local1.pop().is_none()); +} + +#[test] +fn stress1() { + const NUM_ITER: usize = 1; + const NUM_STEAL: usize = 1_000; + const NUM_LOCAL: usize = 1_000; + const NUM_PUSH: usize = 500; + const NUM_POP: usize = 250; + + for _ in 0..NUM_ITER { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + let th = thread::spawn(move || { + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..NUM_STEAL { + if steal.steal_into(&mut local).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + thread::yield_now(); + } + + n + }); + + let mut n = 0; + + for _ in 0..NUM_LOCAL { + for _ in 0..NUM_PUSH { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + } + + for _ in 0..NUM_POP { + if local.pop().is_some() { + n += 1; + } else { + break; + } + } + } + + while inject.pop().is_some() { + n += 1; + } + + n += th.join().unwrap(); + + assert_eq!(n, NUM_LOCAL * NUM_PUSH); + } +} + +#[test] +fn stress2() { + const NUM_ITER: usize = 1; + const NUM_TASKS: usize = 1_000_000; + const NUM_STEAL: usize = 1_000; + + for _ in 0..NUM_ITER { + let (steal, mut local) = queue::local(); + let inject = queue::Inject::new(); + + let th = thread::spawn(move || { + let (_, mut local) = queue::local(); + let mut n = 0; + + for _ in 0..NUM_STEAL { + if steal.steal_into(&mut local).is_some() { + n += 1; + } + + while local.pop().is_some() { + n += 1; + } + + thread::sleep(Duration::from_micros(10)); + } + + n + }); + + let mut num_pop = 0; + + for i in 0..NUM_TASKS { + let (task, _) = task::joinable::<_, Runtime>(async {}); + local.push_back(task, &inject); + + if i % 128 == 0 && local.pop().is_some() { + num_pop += 1; + } + + while inject.pop().is_some() { + num_pop += 1; + } + } + + num_pop += th.join().unwrap(); + + while local.pop().is_some() { + num_pop += 1; + } + + while inject.pop().is_some() { + num_pop += 1; + } + + assert_eq!(num_pop, NUM_TASKS); + } +} + +struct Runtime; + +impl Schedule for Runtime { + fn bind(task: Task<Self>) -> Runtime { + std::mem::forget(task); + Runtime + } + + fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> { + None + } + + fn schedule(&self, _task: task::Notified<Self>) { + unreachable!(); + } +} diff --git a/third_party/rust/tokio/src/runtime/tests/task.rs b/third_party/rust/tokio/src/runtime/tests/task.rs new file mode 100644 index 0000000000..82315a04ff --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/task.rs @@ -0,0 +1,159 @@ +use crate::runtime::task::{self, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::TryLock; + +use std::collections::VecDeque; +use std::sync::Arc; + +#[test] +fn create_drop() { + let _ = task::joinable::<_, Runtime>(async { unreachable!() }); +} + +#[test] +fn schedule() { + with(|rt| { + let (task, _) = task::joinable(async { + crate::task::yield_now().await; + }); + + rt.schedule(task); + + assert_eq!(2, rt.tick()); + }) +} + +#[test] +fn shutdown() { + with(|rt| { + let (task, _) = task::joinable(async { + loop { + crate::task::yield_now().await; + } + }); + + rt.schedule(task); + rt.tick_max(1); + + rt.shutdown(); + }) +} + +fn with(f: impl FnOnce(Runtime)) { + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + let _rt = CURRENT.try_lock().unwrap().take(); + } + } + + let _reset = Reset; + + let rt = Runtime(Arc::new(Inner { + released: task::TransferStack::new(), + core: TryLock::new(Core { + queue: VecDeque::new(), + tasks: LinkedList::new(), + }), + })); + + *CURRENT.try_lock().unwrap() = Some(rt.clone()); + f(rt) +} + +#[derive(Clone)] +struct Runtime(Arc<Inner>); + +struct Inner { + released: task::TransferStack<Runtime>, + core: TryLock<Core>, +} + +struct Core { + queue: VecDeque<task::Notified<Runtime>>, + tasks: LinkedList<Task<Runtime>>, +} + +static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None); + +impl Runtime { + fn tick(&self) -> usize { + self.tick_max(usize::max_value()) + } + + fn tick_max(&self, max: usize) -> usize { + let mut n = 0; + + while !self.is_empty() && n < max { + let task = self.next_task(); + n += 1; + task.run(); + } + + self.0.maintenance(); + + n + } + + fn is_empty(&self) -> bool { + self.0.core.try_lock().unwrap().queue.is_empty() + } + + fn next_task(&self) -> task::Notified<Runtime> { + self.0.core.try_lock().unwrap().queue.pop_front().unwrap() + } + + fn shutdown(&self) { + let mut core = self.0.core.try_lock().unwrap(); + + for task in core.tasks.iter() { + task.shutdown(); + } + + while let Some(task) = core.queue.pop_back() { + task.shutdown(); + } + + drop(core); + + while !self.0.core.try_lock().unwrap().tasks.is_empty() { + self.0.maintenance(); + } + } +} + +impl Inner { + fn maintenance(&self) { + use std::mem::ManuallyDrop; + + for task in self.released.drain() { + let task = ManuallyDrop::new(task); + + // safety: see worker.rs + unsafe { + let ptr = task.header().into(); + self.core.try_lock().unwrap().tasks.remove(ptr); + } + } + } +} + +impl Schedule for Runtime { + fn bind(task: Task<Self>) -> Runtime { + let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); + rt.0.core.try_lock().unwrap().tasks.push_front(task); + rt + } + + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + // safety: copying worker.rs + let task = unsafe { Task::from_raw(task.header().into()) }; + self.0.released.push(task); + None + } + + fn schedule(&self, task: task::Notified<Self>) { + self.0.core.try_lock().unwrap().queue.push_back(task); + } +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs b/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs new file mode 100644 index 0000000000..2bda0fc738 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/atomic_cell.rs @@ -0,0 +1,52 @@ +use crate::loom::sync::atomic::AtomicPtr; + +use std::ptr; +use std::sync::atomic::Ordering::AcqRel; + +pub(super) struct AtomicCell<T> { + data: AtomicPtr<T>, +} + +unsafe impl<T: Send> Send for AtomicCell<T> {} +unsafe impl<T: Send> Sync for AtomicCell<T> {} + +impl<T> AtomicCell<T> { + pub(super) fn new(data: Option<Box<T>>) -> AtomicCell<T> { + AtomicCell { + data: AtomicPtr::new(to_raw(data)), + } + } + + pub(super) fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> { + let old = self.data.swap(to_raw(val), AcqRel); + from_raw(old) + } + + #[cfg(feature = "blocking")] + pub(super) fn set(&self, val: Box<T>) { + let _ = self.swap(Some(val)); + } + + pub(super) fn take(&self) -> Option<Box<T>> { + self.swap(None) + } +} + +fn to_raw<T>(data: Option<Box<T>>) -> *mut T { + data.map(Box::into_raw).unwrap_or(ptr::null_mut()) +} + +fn from_raw<T>(val: *mut T) -> Option<Box<T>> { + if val.is_null() { + None + } else { + Some(unsafe { Box::from_raw(val) }) + } +} + +impl<T> Drop for AtomicCell<T> { + fn drop(&mut self) { + // Free any data still held by the cell + let _ = self.take(); + } +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/idle.rs b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs new file mode 100644 index 0000000000..ae87ca4ba1 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/idle.rs @@ -0,0 +1,222 @@ +//! Coordinates idling workers + +use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::Mutex; + +use std::fmt; +use std::sync::atomic::Ordering::{self, SeqCst}; + +pub(super) struct Idle { + /// Tracks both the number of searching workers and the number of unparked + /// workers. + /// + /// Used as a fast-path to avoid acquiring the lock when needed. + state: AtomicUsize, + + /// Sleeping workers + sleepers: Mutex<Vec<usize>>, + + /// Total number of workers. + num_workers: usize, +} + +const UNPARK_SHIFT: usize = 16; +const UNPARK_MASK: usize = !SEARCH_MASK; +const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1; + +#[derive(Copy, Clone)] +struct State(usize); + +impl Idle { + pub(super) fn new(num_workers: usize) -> Idle { + let init = State::new(num_workers); + + Idle { + state: AtomicUsize::new(init.into()), + sleepers: Mutex::new(Vec::with_capacity(num_workers)), + num_workers, + } + } + + /// If there are no workers actively searching, returns the index of a + /// worker currently sleeping. + pub(super) fn worker_to_notify(&self) -> Option<usize> { + // If at least one worker is spinning, work being notified will + // eventully be found. A searching thread will find **some** work and + // notify another worker, eventually leading to our work being found. + // + // For this to happen, this load must happen before the thread + // transitioning `num_searching` to zero. Acquire / Relese does not + // provide sufficient guarantees, so this load is done with `SeqCst` and + // will pair with the `fetch_sub(1)` when transitioning out of + // searching. + if !self.notify_should_wakeup() { + return None; + } + + // Acquire the lock + let mut sleepers = self.sleepers.lock().unwrap(); + + // Check again, now that the lock is acquired + if !self.notify_should_wakeup() { + return None; + } + + // A worker should be woken up, atomically increment the number of + // searching workers as well as the number of unparked workers. + State::unpark_one(&self.state); + + // Get the worker to unpark + let ret = sleepers.pop(); + debug_assert!(ret.is_some()); + + ret + } + + /// Returns `true` if the worker needs to do a final check for submitted + /// work. + pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool { + // Acquire the lock + let mut sleepers = self.sleepers.lock().unwrap(); + + // Decrement the number of unparked threads + let ret = State::dec_num_unparked(&self.state, is_searching); + + // Track the sleeping worker + sleepers.push(worker); + + ret + } + + pub(super) fn transition_worker_to_searching(&self) -> bool { + let state = State::load(&self.state, SeqCst); + if 2 * state.num_searching() >= self.num_workers { + return false; + } + + // It is possible for this routine to allow more than 50% of the workers + // to search. That is OK. Limiting searchers is only an optimization to + // prevent too much contention. + State::inc_num_searching(&self.state, SeqCst); + true + } + + /// A lightweight transition from searching -> running. + /// + /// Returns `true` if this is the final searching worker. The caller + /// **must** notify a new worker. + pub(super) fn transition_worker_from_searching(&self) -> bool { + State::dec_num_searching(&self.state) + } + + /// Unpark a specific worker. This happens if tasks are submitted from + /// within the worker's park routine. + pub(super) fn unpark_worker_by_id(&self, worker_id: usize) { + let mut sleepers = self.sleepers.lock().unwrap(); + + for index in 0..sleepers.len() { + if sleepers[index] == worker_id { + sleepers.swap_remove(index); + + // Update the state accordingly whle the lock is held. + State::unpark_one(&self.state); + + return; + } + } + } + + /// Returns `true` if `worker_id` is contained in the sleep set + pub(super) fn is_parked(&self, worker_id: usize) -> bool { + let sleepers = self.sleepers.lock().unwrap(); + sleepers.contains(&worker_id) + } + + fn notify_should_wakeup(&self) -> bool { + let state = State(self.state.fetch_add(0, SeqCst)); + state.num_searching() == 0 && state.num_unparked() < self.num_workers + } +} + +impl State { + fn new(num_workers: usize) -> State { + // All workers start in the unparked state + let ret = State(num_workers << UNPARK_SHIFT); + debug_assert_eq!(num_workers, ret.num_unparked()); + debug_assert_eq!(0, ret.num_searching()); + ret + } + + fn load(cell: &AtomicUsize, ordering: Ordering) -> State { + State(cell.load(ordering)) + } + + fn unpark_one(cell: &AtomicUsize) { + cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst); + } + + fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) { + cell.fetch_add(1, ordering); + } + + /// Returns `true` if this is the final searching worker + fn dec_num_searching(cell: &AtomicUsize) -> bool { + let state = State(cell.fetch_sub(1, SeqCst)); + state.num_searching() == 1 + } + + /// Track a sleeping worker + /// + /// Returns `true` if this is the final searching worker. + fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool { + let mut dec = 1 << UNPARK_SHIFT; + + if is_searching { + dec += 1; + } + + let prev = State(cell.fetch_sub(dec, SeqCst)); + is_searching && prev.num_searching() == 1 + } + + /// Number of workers currently searching + fn num_searching(self) -> usize { + self.0 & SEARCH_MASK + } + + /// Number of workers currently unparked + fn num_unparked(self) -> usize { + (self.0 & UNPARK_MASK) >> UNPARK_SHIFT + } +} + +impl From<usize> for State { + fn from(src: usize) -> State { + State(src) + } +} + +impl From<State> for usize { + fn from(src: State) -> usize { + src.0 + } +} + +impl fmt::Debug for State { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("worker::State") + .field("num_unparked", &self.num_unparked()) + .field("num_searching", &self.num_searching()) + .finish() + } +} + +#[test] +fn test_state() { + assert_eq!(0, UNPARK_MASK & SEARCH_MASK); + assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK)); + + let state = State::new(10); + assert_eq!(10, state.num_unparked()); + assert_eq!(0, state.num_searching()); +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/mod.rs b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs new file mode 100644 index 0000000000..82e82d5b30 --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/mod.rs @@ -0,0 +1,117 @@ +//! Threadpool + +mod atomic_cell; +use atomic_cell::AtomicCell; + +mod idle; +use self::idle::Idle; + +mod worker; +pub(crate) use worker::Launch; + +cfg_blocking! { + pub(crate) use worker::block_in_place; +} + +use crate::loom::sync::Arc; +use crate::runtime::task::{self, JoinHandle}; +use crate::runtime::Parker; + +use std::fmt; +use std::future::Future; + +/// Work-stealing based thread pool for executing futures. +pub(crate) struct ThreadPool { + spawner: Spawner, +} + +/// Submit futures to the associated thread pool for execution. +/// +/// A `Spawner` instance is a handle to a single thread pool that allows the owner +/// of the handle to spawn futures onto the thread pool. +/// +/// The `Spawner` handle is *only* used for spawning new futures. It does not +/// impact the lifecycle of the thread pool in any way. The thread pool may +/// shutdown while there are outstanding `Spawner` instances. +/// +/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`]. +/// +/// [`ThreadPool::spawner`]: method@ThreadPool::spawner +#[derive(Clone)] +pub(crate) struct Spawner { + shared: Arc<worker::Shared>, +} + +// ===== impl ThreadPool ===== + +impl ThreadPool { + pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, parker); + let spawner = Spawner { shared }; + let thread_pool = ThreadPool { spawner }; + + (thread_pool, launch) + } + + /// Returns reference to `Spawner`. + /// + /// The `Spawner` handle can be cloned and enables spawning tasks from other + /// threads. + pub(crate) fn spawner(&self) -> &Spawner { + &self.spawner + } + + /// Spawns a task + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawner.spawn(future) + } + + /// Blocks the current thread waiting for the future to complete. + /// + /// The future will execute on the current thread, but all spawned tasks + /// will be executed on the thread pool. + pub(crate) fn block_on<F>(&self, future: F) -> F::Output + where + F: Future, + { + let mut enter = crate::runtime::enter(); + enter.block_on(future).expect("failed to park thread") + } +} + +impl fmt::Debug for ThreadPool { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ThreadPool").finish() + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + self.spawner.shared.close(); + } +} + +// ==== impl Spawner ===== + +impl Spawner { + /// Spawns a future onto the thread pool + pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (task, handle) = task::joinable(future); + self.shared.schedule(task, false); + handle + } +} + +impl fmt::Debug for Spawner { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Spawner").finish() + } +} diff --git a/third_party/rust/tokio/src/runtime/thread_pool/worker.rs b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs new file mode 100644 index 0000000000..400e2a938c --- /dev/null +++ b/third_party/rust/tokio/src/runtime/thread_pool/worker.rs @@ -0,0 +1,761 @@ +//! A scheduler is initialized with a fixed number of workers. Each worker is +//! driven by a thread. Each worker has a "core" which contains data such as the +//! run queue and other state. When `block_in_place` is called, the worker's +//! "core" is handed off to a new thread allowing the scheduler to continue to +//! make progress while the originating thread blocks. + +use crate::loom::rand::seed; +use crate::loom::sync::{Arc, Mutex}; +use crate::park::{Park, Unpark}; +use crate::runtime; +use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::thread_pool::{AtomicCell, Idle}; +use crate::runtime::{queue, task}; +use crate::util::linked_list::LinkedList; +use crate::util::FastRand; + +use std::cell::RefCell; +use std::time::Duration; + +/// A scheduler worker +pub(super) struct Worker { + /// Reference to shared state + shared: Arc<Shared>, + + /// Index holding this worker's remote state + index: usize, + + /// Used to hand-off a worker's core to another thread. + core: AtomicCell<Core>, +} + +/// Core data +struct Core { + /// Used to schedule bookkeeping tasks every so often. + tick: u8, + + /// When a task is scheduled from a worker, it is stored in this slot. The + /// worker will check this slot for a task **before** checking the run + /// queue. This effectively results in the **last** scheduled task to be run + /// next (LIFO). This is an optimization for message passing patterns and + /// helps to reduce latency. + lifo_slot: Option<Notified>, + + /// The worker-local run queue. + run_queue: queue::Local<Arc<Worker>>, + + /// True if the worker is currently searching for more work. Searching + /// involves attempting to steal from other workers. + is_searching: bool, + + /// True if the scheduler is being shutdown + is_shutdown: bool, + + /// Tasks owned by the core + tasks: LinkedList<Task>, + + /// Parker + /// + /// Stored in an `Option` as the parker is added / removed to make the + /// borrow checker happy. + park: Option<Parker>, + + /// Fast random number generator. + rand: FastRand, +} + +/// State shared across all workers +pub(super) struct Shared { + /// Per-worker remote state. All other workers have access to this and is + /// how they communicate between each other. + remotes: Box<[Remote]>, + + /// Submit work to the scheduler while **not** currently on a worker thread. + inject: queue::Inject<Arc<Worker>>, + + /// Coordinates idle workers + idle: Idle, + + /// Workers have have observed the shutdown signal + /// + /// The core is **not** placed back in the worker to avoid it from being + /// stolen by a thread that was spawned as part of `block_in_place`. + shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>, +} + +/// Used to communicate with a worker from other threads. +struct Remote { + /// Steal tasks from this worker. + steal: queue::Steal<Arc<Worker>>, + + /// Transfers tasks to be released. Any worker pushes tasks, only the owning + /// worker pops. + pending_drop: task::TransferStack<Arc<Worker>>, + + /// Unparks the associated worker thread + unpark: Unparker, +} + +/// Thread-local context +struct Context { + /// Worker + worker: Arc<Worker>, + + /// Core data + core: RefCell<Option<Box<Core>>>, +} + +/// Starts the workers +pub(crate) struct Launch(Vec<Arc<Worker>>); + +/// Running a task may consume the core. If the core is still available when +/// running the task completes, it is returned. Otherwise, the worker will need +/// to stop processing. +type RunResult = Result<Box<Core>, ()>; + +/// A task handle +type Task = task::Task<Arc<Worker>>; + +/// A notified task handle +type Notified = task::Notified<Arc<Worker>>; + +// Tracks thread-local state +scoped_thread_local!(static CURRENT: Context); + +pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) { + let mut cores = vec![]; + let mut remotes = vec![]; + + // Create the local queues + for _ in 0..size { + let (steal, run_queue) = queue::local(); + + let park = park.clone(); + let unpark = park.unpark(); + + cores.push(Box::new(Core { + tick: 0, + lifo_slot: None, + run_queue, + is_searching: false, + is_shutdown: false, + tasks: LinkedList::new(), + park: Some(park), + rand: FastRand::new(seed()), + })); + + remotes.push(Remote { + steal, + pending_drop: task::TransferStack::new(), + unpark, + }); + } + + let shared = Arc::new(Shared { + remotes: remotes.into_boxed_slice(), + inject: queue::Inject::new(), + idle: Idle::new(size), + shutdown_workers: Mutex::new(vec![]), + }); + + let mut launch = Launch(vec![]); + + for (index, core) in cores.drain(..).enumerate() { + launch.0.push(Arc::new(Worker { + shared: shared.clone(), + index, + core: AtomicCell::new(Some(core)), + })); + } + + (shared, launch) +} + +cfg_blocking! { + pub(crate) fn block_in_place<F, R>(f: F) -> R + where + F: FnOnce() -> R, + { + // Try to steal the worker core back + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + let core = cx.worker.core.take(); + *cx.core.borrow_mut() = core; + } + }); + } + } + + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("can call blocking only when running in a spawned task"); + + // Get the worker core. If none is set, then blocking is fine! + let core = match cx.core.borrow_mut().take() { + Some(core) => { + // We are effectively leaving the executor, so we need to + // forcibly end budgeting. + crate::coop::stop(); + core + }, + None => return, + }; + + // The parker should be set here + assert!(core.park.is_some()); + + // In order to block, the core must be sent to another thread for + // execution. + // + // First, move the core back into the worker's shared core slot. + cx.worker.core.set(core); + + // Next, clone the worker handle and send it to a new thread for + // processing. + // + // Once the blocking task is done executing, we will attempt to + // steal the core back. + let worker = cx.worker.clone(); + runtime::spawn_blocking(move || run(worker)); + }); + + let _reset = Reset; + + f() + } +} + +/// After how many ticks is the global queue polled. This helps to ensure +/// fairness. +/// +/// The number is fairly arbitrary. I believe this value was copied from golang. +const GLOBAL_POLL_INTERVAL: u8 = 61; + +impl Launch { + pub(crate) fn launch(mut self) { + for worker in self.0.drain(..) { + runtime::spawn_blocking(move || run(worker)); + } + } +} + +fn run(worker: Arc<Worker>) { + // Acquire a core. If this fails, then another thread is running this + // worker and there is nothing further to do. + let core = match worker.core.take() { + Some(core) => core, + None => return, + }; + + // Set the worker context. + let cx = Context { + worker, + core: RefCell::new(None), + }; + + let _enter = crate::runtime::enter(); + + CURRENT.set(&cx, || { + // This should always be an error. It only returns a `Result` to support + // using `?` to short circuit. + assert!(cx.run(core).is_err()); + }); +} + +impl Context { + fn run(&self, mut core: Box<Core>) -> RunResult { + while !core.is_shutdown { + // Increment the tick + core.tick(); + + // Run maintenance, if needed + core = self.maintenance(core); + + // First, check work available to the current worker. + if let Some(task) = core.next_task(&self.worker) { + core = self.run_task(task, core)?; + continue; + } + + // There is no more **local** work to process, try to steal work + // from other workers. + if let Some(task) = core.steal_work(&self.worker) { + core = self.run_task(task, core)?; + } else { + // Wait for work + core = self.park(core); + } + } + + // Signal shutdown + self.worker.shared.shutdown(core, self.worker.clone()); + Err(()) + } + + fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult { + // Make sure thew orker is not in the **searching** state. This enables + // another idle worker to try to steal work. + core.transition_from_searching(&self.worker); + + // Make the core available to the runtime context + *self.core.borrow_mut() = Some(core); + + // Run the task + crate::coop::budget(|| { + task.run(); + + // As long as there is budget remaining and a task exists in the + // `lifo_slot`, then keep running. + loop { + // Check if we still have the core. If not, the core was stolen + // by another worker. + let mut core = match self.core.borrow_mut().take() { + Some(core) => core, + None => return Err(()), + }; + + // Check for a task in the LIFO slot + let task = match core.lifo_slot.take() { + Some(task) => task, + None => return Ok(core), + }; + + if crate::coop::has_budget_remaining() { + // Run the LIFO task, then loop + *self.core.borrow_mut() = Some(core); + task.run(); + } else { + // Not enough budget left to run the LIFO task, push it to + // the back of the queue and return. + core.run_queue.push_back(task, self.worker.inject()); + return Ok(core); + } + } + }) + } + + fn maintenance(&self, mut core: Box<Core>) -> Box<Core> { + if core.tick % GLOBAL_POLL_INTERVAL == 0 { + // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... + // to run without actually putting the thread to sleep. + core = self.park_timeout(core, Some(Duration::from_millis(0))); + + // Run regularly scheduled maintenance + core.maintenance(&self.worker); + } + + core + } + + fn park(&self, mut core: Box<Core>) -> Box<Core> { + core.transition_to_parked(&self.worker); + + while !core.is_shutdown { + core = self.park_timeout(core, None); + + // Run regularly scheduled maintenance + core.maintenance(&self.worker); + + if core.transition_from_parked(&self.worker) { + return core; + } + } + + core + } + + fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> { + // Take the parker out of core + let mut park = core.park.take().expect("park missing"); + + // Store `core` in context + *self.core.borrow_mut() = Some(core); + + // Park thread + if let Some(timeout) = duration { + park.park_timeout(timeout).expect("park failed"); + } else { + park.park().expect("park failed"); + } + + // Remove `core` from context + core = self.core.borrow_mut().take().expect("core missing"); + + // Place `park` back in `core` + core.park = Some(park); + + // If there are tasks available to steal, notify a worker + if core.run_queue.is_stealable() { + self.worker.shared.notify_parked(); + } + + core + } +} + +impl Core { + /// Increment the tick + fn tick(&mut self) { + self.tick = self.tick.wrapping_add(1); + } + + /// Return the next notified task available to this worker. + fn next_task(&mut self, worker: &Worker) -> Option<Notified> { + if self.tick % GLOBAL_POLL_INTERVAL == 0 { + worker.inject().pop().or_else(|| self.next_local_task()) + } else { + self.next_local_task().or_else(|| worker.inject().pop()) + } + } + + fn next_local_task(&mut self) -> Option<Notified> { + self.lifo_slot.take().or_else(|| self.run_queue.pop()) + } + + fn steal_work(&mut self, worker: &Worker) -> Option<Notified> { + if !self.transition_to_searching(worker) { + return None; + } + + let num = worker.shared.remotes.len(); + // Start from a random worker + let start = self.rand.fastrand_n(num as u32) as usize; + + for i in 0..num { + let i = (start + i) % num; + + // Don't steal from ourself! We know we don't have work. + if i == worker.index { + continue; + } + + let target = &worker.shared.remotes[i]; + if let Some(task) = target.steal.steal_into(&mut self.run_queue) { + return Some(task); + } + } + + // Fallback on checking the global queue + worker.shared.inject.pop() + } + + fn transition_to_searching(&mut self, worker: &Worker) -> bool { + if !self.is_searching { + self.is_searching = worker.shared.idle.transition_worker_to_searching(); + } + + self.is_searching + } + + fn transition_from_searching(&mut self, worker: &Worker) { + if !self.is_searching { + return; + } + + self.is_searching = false; + worker.shared.transition_worker_from_searching(); + } + + /// Prepare the worker state for parking + fn transition_to_parked(&mut self, worker: &Worker) { + // When the final worker transitions **out** of searching to parked, it + // must check all the queues one last time in case work materialized + // between the last work scan and transitioning out of searching. + let is_last_searcher = worker + .shared + .idle + .transition_worker_to_parked(worker.index, self.is_searching); + + // The worker is no longer searching. Setting this is the local cache + // only. + self.is_searching = false; + + if is_last_searcher { + worker.shared.notify_if_work_pending(); + } + } + + /// Returns `true` if the transition happened. + fn transition_from_parked(&mut self, worker: &Worker) -> bool { + // If a task is in the lifo slot, then we must unpark regardless of + // being notified + if self.lifo_slot.is_some() { + worker.shared.idle.unpark_worker_by_id(worker.index); + self.is_searching = true; + return true; + } + + if worker.shared.idle.is_parked(worker.index) { + return false; + } + + // When unparked, the worker is in the searching state. + self.is_searching = true; + true + } + + /// Runs maintenance work such as free pending tasks and check the pool's + /// state. + fn maintenance(&mut self, worker: &Worker) { + self.drain_pending_drop(worker); + + if !self.is_shutdown { + // Check if the scheduler has been shutdown + self.is_shutdown = worker.inject().is_closed(); + } + } + + // Shutdown the core + fn shutdown(&mut self, worker: &Worker) { + // Take the core + let mut park = self.park.take().expect("park missing"); + + // Signal to all tasks to shut down. + for header in self.tasks.iter() { + header.shutdown(); + } + + loop { + self.drain_pending_drop(worker); + + if self.tasks.is_empty() { + break; + } + + // Wait until signalled + park.park().expect("park failed"); + } + + // Drain the queue + while let Some(_) = self.next_local_task() {} + } + + fn drain_pending_drop(&mut self, worker: &Worker) { + use std::mem::ManuallyDrop; + + for task in worker.remote().pending_drop.drain() { + let task = ManuallyDrop::new(task); + + // safety: tasks are only pushed into the `pending_drop` stacks that + // are associated with the list they are inserted into. When a task + // is pushed into `pending_drop`, the ref-inc is skipped, so we must + // not ref-dec here. + // + // See `bind` and `release` implementations. + unsafe { + self.tasks.remove(task.header().into()); + } + } + } +} + +impl Worker { + /// Returns a reference to the scheduler's injection queue + fn inject(&self) -> &queue::Inject<Arc<Worker>> { + &self.shared.inject + } + + /// Return a reference to this worker's remote data + fn remote(&self) -> &Remote { + &self.shared.remotes[self.index] + } + + fn eq(&self, other: &Worker) -> bool { + self.shared.ptr_eq(&other.shared) && self.index == other.index + } +} + +impl task::Schedule for Arc<Worker> { + fn bind(task: Task) -> Arc<Worker> { + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + + // Track the task + cx.core + .borrow_mut() + .as_mut() + .expect("scheduler core missing") + .tasks + .push_front(task); + + // Return a clone of the worker + cx.worker.clone() + }) + } + + fn release(&self, task: &Task) -> Option<Task> { + use std::ptr::NonNull; + + CURRENT.with(|maybe_cx| { + let cx = maybe_cx.expect("scheduler context missing"); + + if self.eq(&cx.worker) { + let mut maybe_core = cx.core.borrow_mut(); + + if let Some(core) = &mut *maybe_core { + // Directly remove the task + // + // safety: the task is inserted in the list in `bind`. + unsafe { + let ptr = NonNull::from(task.header()); + return core.tasks.remove(ptr); + } + } + } + + // Track the task to be released by the worker that owns it + // + // Safety: We get a new handle without incrementing the ref-count. + // A ref-count is held by the "owned" linked list and it is only + // ever removed from that list as part of the release process: this + // method or popping the task from `pending_drop`. Thus, we can rely + // on the ref-count held by the linked-list to keep the memory + // alive. + // + // When the task is removed from the stack, it is forgotten instead + // of dropped. + let task = unsafe { Task::from_raw(task.header().into()) }; + + self.remote().pending_drop.push(task); + + if cx.core.borrow().is_some() { + return None; + } + + // The worker core has been handed off to another thread. In the + // event that the scheduler is currently shutting down, the thread + // that owns the task may be waiting on the release to complete + // shutdown. + if self.inject().is_closed() { + self.remote().unpark.unpark(); + } + + None + }) + } + + fn schedule(&self, task: Notified) { + self.shared.schedule(task, false); + } + + fn yield_now(&self, task: Notified) { + self.shared.schedule(task, true); + } +} + +impl Shared { + pub(super) fn schedule(&self, task: Notified, is_yield: bool) { + CURRENT.with(|maybe_cx| { + if let Some(cx) = maybe_cx { + // Make sure the task is part of the **current** scheduler. + if self.ptr_eq(&cx.worker.shared) { + // And the current thread still holds a core + if let Some(core) = cx.core.borrow_mut().as_mut() { + self.schedule_local(core, task, is_yield); + return; + } + } + } + + // Otherwise, use the inject queue + self.inject.push(task); + self.notify_parked(); + }); + } + + fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { + // Spawning from the worker thread. If scheduling a "yield" then the + // task must always be pushed to the back of the queue, enabling other + // tasks to be executed. If **not** a yield, then there is more + // flexibility and the task may go to the front of the queue. + let should_notify = if is_yield { + core.run_queue.push_back(task, &self.inject); + true + } else { + // Push to the LIFO slot + let prev = core.lifo_slot.take(); + let ret = prev.is_some(); + + if let Some(prev) = prev { + core.run_queue.push_back(prev, &self.inject); + } + + core.lifo_slot = Some(task); + + ret + }; + + // Only notify if not currently parked. If `park` is `None`, then the + // scheduling is from a resource driver. As notifications often come in + // batches, the notification is delayed until the park is complete. + if should_notify && core.park.is_some() { + self.notify_parked(); + } + } + + pub(super) fn close(&self) { + if self.inject.close() { + self.notify_all(); + } + } + + fn notify_parked(&self) { + if let Some(index) = self.idle.worker_to_notify() { + self.remotes[index].unpark.unpark(); + } + } + + fn notify_all(&self) { + for remote in &self.remotes[..] { + remote.unpark.unpark(); + } + } + + fn notify_if_work_pending(&self) { + for remote in &self.remotes[..] { + if !remote.steal.is_empty() { + self.notify_parked(); + return; + } + } + + if !self.inject.is_empty() { + self.notify_parked(); + } + } + + fn transition_worker_from_searching(&self) { + if self.idle.transition_worker_from_searching() { + // We are the final searching worker. Because work was found, we + // need to notify another worker. + self.notify_parked(); + } + } + + /// Signals that a worker has observed the shutdown signal and has replaced + /// its core back into its handle. + /// + /// If all workers have reached this point, the final cleanup is performed. + fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) { + let mut workers = self.shutdown_workers.lock().unwrap(); + workers.push((core, worker)); + + if workers.len() != self.remotes.len() { + return; + } + + for (mut core, worker) in workers.drain(..) { + core.shutdown(&worker); + } + + // Drain the injection queue + while let Some(_) = self.inject.pop() {} + } + + fn ptr_eq(&self, other: &Shared) -> bool { + self as *const _ == other as *const _ + } +} diff --git a/third_party/rust/tokio/src/runtime/time.rs b/third_party/rust/tokio/src/runtime/time.rs new file mode 100644 index 0000000000..c623d9641a --- /dev/null +++ b/third_party/rust/tokio/src/runtime/time.rs @@ -0,0 +1,59 @@ +//! Abstracts out the APIs necessary to `Runtime` for integrating the time +//! driver. When the `time` feature flag is **not** enabled. These APIs are +//! shells. This isolates the complexity of dealing with conditional +//! compilation. + +pub(crate) use variant::*; + +#[cfg(feature = "time")] +mod variant { + use crate::park::Either; + use crate::runtime::io; + use crate::time::{self, driver}; + + pub(crate) type Clock = time::Clock; + pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>; + pub(crate) type Handle = Option<driver::Handle>; + + pub(crate) fn create_clock() -> Clock { + Clock::new() + } + + /// Create a new timer driver / handle pair + pub(crate) fn create_driver( + enable: bool, + io_driver: io::Driver, + clock: Clock, + ) -> (Driver, Handle) { + if enable { + let driver = driver::Driver::new(io_driver, clock); + let handle = driver.handle(); + + (Either::A(driver), Some(handle)) + } else { + (Either::B(io_driver), None) + } + } +} + +#[cfg(not(feature = "time"))] +mod variant { + use crate::runtime::io; + + pub(crate) type Clock = (); + pub(crate) type Driver = io::Driver; + pub(crate) type Handle = (); + + pub(crate) fn create_clock() -> Clock { + () + } + + /// Create a new timer driver / handle pair + pub(crate) fn create_driver( + _enable: bool, + io_driver: io::Driver, + _clock: Clock, + ) -> (Driver, Handle) { + (io_driver, ()) + } +} |