summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/task
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/task
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/task')
-rw-r--r--vendor/tokio/src/runtime/task/abort.rs87
-rw-r--r--vendor/tokio/src/runtime/task/core.rs387
-rw-r--r--vendor/tokio/src/runtime/task/error.rs44
-rw-r--r--vendor/tokio/src/runtime/task/harness.rs610
-rw-r--r--vendor/tokio/src/runtime/task/id.rs87
-rw-r--r--vendor/tokio/src/runtime/task/join.rs198
-rw-r--r--vendor/tokio/src/runtime/task/list.rs319
-rw-r--r--vendor/tokio/src/runtime/task/mod.rs421
-rw-r--r--vendor/tokio/src/runtime/task/raw.rs214
-rw-r--r--vendor/tokio/src/runtime/task/stack.rs83
-rw-r--r--vendor/tokio/src/runtime/task/state.rs357
-rw-r--r--vendor/tokio/src/runtime/task/trace/mod.rs330
-rw-r--r--vendor/tokio/src/runtime/task/trace/symbol.rs92
-rw-r--r--vendor/tokio/src/runtime/task/trace/tree.rs126
-rw-r--r--vendor/tokio/src/runtime/task/waker.rs88
15 files changed, 2625 insertions, 818 deletions
diff --git a/vendor/tokio/src/runtime/task/abort.rs b/vendor/tokio/src/runtime/task/abort.rs
new file mode 100644
index 000000000..6edca1004
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/abort.rs
@@ -0,0 +1,87 @@
+use crate::runtime::task::{Header, RawTask};
+use std::fmt;
+use std::panic::{RefUnwindSafe, UnwindSafe};
+
+/// An owned permission to abort a spawned task, without awaiting its completion.
+///
+/// Unlike a [`JoinHandle`], an `AbortHandle` does *not* represent the
+/// permission to await the task's completion, only to terminate it.
+///
+/// The task may be aborted by calling the [`AbortHandle::abort`] method.
+/// Dropping an `AbortHandle` releases the permission to terminate the task
+/// --- it does *not* abort the task.
+///
+/// [`JoinHandle`]: crate::task::JoinHandle
+#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
+pub struct AbortHandle {
+ raw: RawTask,
+}
+
+impl AbortHandle {
+ pub(super) fn new(raw: RawTask) -> Self {
+ Self { raw }
+ }
+
+ /// Abort the task associated with the handle.
+ ///
+ /// Awaiting a cancelled task might complete as usual if the task was
+ /// already completed at the time it was cancelled, but most likely it
+ /// will fail with a [cancelled] `JoinError`.
+ ///
+ /// If the task was already cancelled, such as by [`JoinHandle::abort`],
+ /// this method will do nothing.
+ ///
+ /// [cancelled]: method@super::error::JoinError::is_cancelled
+ /// [`JoinHandle::abort`]: method@super::JoinHandle::abort
+ pub fn abort(&self) {
+ self.raw.remote_abort();
+ }
+
+ /// Checks if the task associated with this `AbortHandle` has finished.
+ ///
+ /// Please note that this method can return `false` even if `abort` has been
+ /// called on the task. This is because the cancellation process may take
+ /// some time, and this method does not return `true` until it has
+ /// completed.
+ pub fn is_finished(&self) -> bool {
+ let state = self.raw.state().load();
+ state.is_complete()
+ }
+
+ /// Returns a [task ID] that uniquely identifies this task relative to other
+ /// currently spawned tasks.
+ ///
+ /// **Note**: This is an [unstable API][unstable]. The public API of this type
+ /// may break in 1.x releases. See [the documentation on unstable
+ /// features][unstable] for details.
+ ///
+ /// [task ID]: crate::task::Id
+ /// [unstable]: crate#unstable-features
+ #[cfg(tokio_unstable)]
+ #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
+ pub fn id(&self) -> super::Id {
+ // Safety: The header pointer is valid.
+ unsafe { Header::get_id(self.raw.header_ptr()) }
+ }
+}
+
+unsafe impl Send for AbortHandle {}
+unsafe impl Sync for AbortHandle {}
+
+impl UnwindSafe for AbortHandle {}
+impl RefUnwindSafe for AbortHandle {}
+
+impl fmt::Debug for AbortHandle {
+ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
+ // Safety: The header pointer is valid.
+ let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
+ let id = unsafe { id_ptr.as_ref() };
+ fmt.debug_struct("AbortHandle").field("id", id).finish()
+ }
+}
+
+impl Drop for AbortHandle {
+ fn drop(&mut self) {
+ self.raw.drop_abort_handle();
+ }
+}
diff --git a/vendor/tokio/src/runtime/task/core.rs b/vendor/tokio/src/runtime/task/core.rs
index 428c921fe..110933e58 100644
--- a/vendor/tokio/src/runtime/task/core.rs
+++ b/vendor/tokio/src/runtime/task/core.rs
@@ -11,9 +11,10 @@
use crate::future::Future;
use crate::loom::cell::UnsafeCell;
+use crate::runtime::context;
use crate::runtime::task::raw::{self, Vtable};
use crate::runtime::task::state::State;
-use crate::runtime::task::{Notified, Schedule, Task};
+use crate::runtime::task::{Id, Schedule};
use crate::util::linked_list;
use std::pin::Pin;
@@ -24,6 +25,97 @@ use std::task::{Context, Poll, Waker};
///
/// It is critical for `Header` to be the first field as the task structure will
/// be referenced by both *mut Cell and *mut Header.
+///
+/// Any changes to the layout of this struct _must_ also be reflected in the
+/// const fns in raw.rs.
+///
+// # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
+// from crossbeam-utils/src/cache_padded.rs
+//
+// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
+// lines at a time, so we have to align to 128 bytes rather than 64.
+//
+// Sources:
+// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
+// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
+//
+// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
+//
+// Sources:
+// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
+//
+// powerpc64 has 128-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
+#[cfg_attr(
+ any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ ),
+ repr(align(128))
+)]
+// arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
+//
+// riscv32 is assumed not to exceed the cache line size of riscv64.
+#[cfg_attr(
+ any(
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv32",
+ target_arch = "riscv64",
+ target_arch = "sparc",
+ target_arch = "hexagon",
+ ),
+ repr(align(32))
+)]
+// m68k has 16-byte cache line size.
+//
+// Sources:
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
+#[cfg_attr(target_arch = "m68k", repr(align(16)))]
+// s390x has 256-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
+#[cfg_attr(target_arch = "s390x", repr(align(256)))]
+// x86, wasm, and sparc64 have 64-byte cache line size.
+//
+// Sources:
+// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
+// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
+// - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
+//
+// All others are assumed to have 64-byte cache line size.
+#[cfg_attr(
+ not(any(
+ target_arch = "x86_64",
+ target_arch = "aarch64",
+ target_arch = "powerpc64",
+ target_arch = "arm",
+ target_arch = "mips",
+ target_arch = "mips64",
+ target_arch = "riscv32",
+ target_arch = "riscv64",
+ target_arch = "sparc",
+ target_arch = "hexagon",
+ target_arch = "m68k",
+ target_arch = "s390x",
+ )),
+ repr(align(64))
+)]
#[repr(C)]
pub(super) struct Cell<T: Future, S> {
/// Hot task state data
@@ -36,10 +128,6 @@ pub(super) struct Cell<T: Future, S> {
pub(super) trailer: Trailer,
}
-pub(super) struct Scheduler<S> {
- scheduler: UnsafeCell<Option<S>>,
-}
-
pub(super) struct CoreStage<T: Future> {
stage: UnsafeCell<Stage<T>>,
}
@@ -47,45 +135,71 @@ pub(super) struct CoreStage<T: Future> {
/// The core of the task.
///
/// Holds the future or output, depending on the stage of execution.
+///
+/// Any changes to the layout of this struct _must_ also be reflected in the
+/// const fns in raw.rs.
+#[repr(C)]
pub(super) struct Core<T: Future, S> {
- /// Scheduler used to drive this future
- pub(super) scheduler: Scheduler<S>,
+ /// Scheduler used to drive this future.
+ pub(super) scheduler: S,
+
+ /// The task's ID, used for populating `JoinError`s.
+ pub(super) task_id: Id,
- /// Either the future or the output
+ /// Either the future or the output.
pub(super) stage: CoreStage<T>,
}
/// Crate public as this is also needed by the pool.
#[repr(C)]
pub(crate) struct Header {
- /// Task state
+ /// Task state.
pub(super) state: State,
- pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,
-
- /// Pointer to next task, used with the injection queue
- pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,
-
- /// Pointer to the next task in the transfer stack
- pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,
+ /// Pointer to next task, used with the injection queue.
+ pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable,
+ /// This integer contains the id of the OwnedTasks or LocalOwnedTasks that
+ /// this task is stored in. If the task is not in any list, should be the
+ /// id of the list that it was previously in, or zero if it has never been
+ /// in any list.
+ ///
+ /// Once a task has been bound to a list, it can never be bound to another
+ /// list, even if removed from the first list.
+ ///
+ /// The id is not unset when removed from a list because we want to be able
+ /// to read the id without synchronization, even if it is concurrently being
+ /// removed from the list.
+ pub(super) owner_id: UnsafeCell<u64>,
+
/// The tracing ID for this instrumented task.
#[cfg(all(tokio_unstable, feature = "tracing"))]
- pub(super) id: Option<tracing::Id>,
+ pub(super) tracing_id: Option<tracing::Id>,
}
unsafe impl Send for Header {}
unsafe impl Sync for Header {}
-/// Cold data is stored after the future.
+/// Cold data is stored after the future. Data is considered cold if it is only
+/// used during creation or shutdown of the task.
pub(super) struct Trailer {
+ /// Pointers for the linked list in the `OwnedTasks` that owns this task.
+ pub(super) owned: linked_list::Pointers<Header>,
/// Consumer task waiting on completion of this task.
pub(super) waker: UnsafeCell<Option<Waker>>,
}
+generate_addr_of_methods! {
+ impl<> Trailer {
+ pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> {
+ &self.owned
+ }
+ }
+}
+
/// Either the future or the output.
pub(super) enum Stage<T: Future> {
Running(T),
@@ -96,117 +210,48 @@ pub(super) enum Stage<T: Future> {
impl<T: Future, S: Schedule> Cell<T, S> {
/// Allocates a new task cell, containing the header, trailer, and core
/// structures.
- pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
+ pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
- let id = future.id();
- Box::new(Cell {
+ let tracing_id = future.id();
+ let result = Box::new(Cell {
header: Header {
state,
- owned: UnsafeCell::new(linked_list::Pointers::new()),
queue_next: UnsafeCell::new(None),
- stack_next: UnsafeCell::new(None),
vtable: raw::vtable::<T, S>(),
+ owner_id: UnsafeCell::new(0),
#[cfg(all(tokio_unstable, feature = "tracing"))]
- id,
+ tracing_id,
},
core: Core {
- scheduler: Scheduler {
- scheduler: UnsafeCell::new(None),
- },
+ scheduler,
stage: CoreStage {
stage: UnsafeCell::new(Stage::Running(future)),
},
+ task_id,
},
trailer: Trailer {
waker: UnsafeCell::new(None),
+ owned: linked_list::Pointers::new(),
},
- })
- }
-}
-
-impl<S: Schedule> Scheduler<S> {
- pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
- self.scheduler.with_mut(f)
- }
-
- /// Bind a scheduler to the task.
- ///
- /// This only happens on the first poll and must be preceded by a call to
- /// `is_bound` to determine if binding is appropriate or not.
- ///
- /// # Safety
- ///
- /// Binding must not be done concurrently since it will mutate the task
- /// core through a shared reference.
- pub(super) fn bind_scheduler(&self, task: Task<S>) {
- // This function may be called concurrently, but the __first__ time it
- // is called, the caller has unique access to this field. All subsequent
- // concurrent calls will be via the `Waker`, which will "happens after"
- // the first poll.
- //
- // In other words, it is always safe to read the field and it is safe to
- // write to the field when it is `None`.
- debug_assert!(!self.is_bound());
-
- // Bind the task to the scheduler
- let scheduler = S::bind(task);
-
- // Safety: As `scheduler` is not set, this is the first poll
- self.scheduler.with_mut(|ptr| unsafe {
- *ptr = Some(scheduler);
});
- }
- /// Returns true if the task is bound to a scheduler.
- pub(super) fn is_bound(&self) -> bool {
- // Safety: never called concurrently w/ a mutation.
- self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
- }
+ #[cfg(debug_assertions)]
+ {
+ let trailer_addr = (&result.trailer) as *const Trailer as usize;
+ let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(&result.header)) };
+ assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
- /// Schedule the future for execution
- pub(super) fn schedule(&self, task: Notified<S>) {
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.schedule(task),
- None => panic!("no scheduler set"),
- }
- });
- }
+ let scheduler_addr = (&result.core.scheduler) as *const S as usize;
+ let scheduler_ptr =
+ unsafe { Header::get_scheduler::<S>(NonNull::from(&result.header)) };
+ assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize);
- /// Schedule the future for execution in the near future, yielding the
- /// thread to other tasks.
- pub(super) fn yield_now(&self, task: Notified<S>) {
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.yield_now(task),
- None => panic!("no scheduler set"),
- }
- });
- }
+ let id_addr = (&result.core.task_id) as *const Id as usize;
+ let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(&result.header)) };
+ assert_eq!(id_addr, id_ptr.as_ptr() as usize);
+ }
- /// Release the task
- ///
- /// If the `Scheduler` implementation is able to, it returns the `Task`
- /// handle immediately. The caller of this function will batch a ref-dec
- /// with a state change.
- pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
- use std::mem::ManuallyDrop;
-
- let task = ManuallyDrop::new(task);
-
- self.scheduler.with(|ptr| {
- // Safety: Can only be called after initial `poll`, which is the
- // only time the field is mutated.
- match unsafe { &*ptr } {
- Some(scheduler) => scheduler.release(&*task),
- // Task was never polled
- None => None,
- }
- })
+ result
}
}
@@ -214,8 +259,30 @@ impl<T: Future> CoreStage<T> {
pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
self.stage.with_mut(f)
}
+}
+
+/// Set and clear the task id in the context when the future is executed or
+/// dropped, or when the output produced by the future is dropped.
+pub(crate) struct TaskIdGuard {
+ parent_task_id: Option<Id>,
+}
+
+impl TaskIdGuard {
+ fn enter(id: Id) -> Self {
+ TaskIdGuard {
+ parent_task_id: context::set_current_task_id(Some(id)),
+ }
+ }
+}
- /// Poll the future
+impl Drop for TaskIdGuard {
+ fn drop(&mut self) {
+ context::set_current_task_id(self.parent_task_id);
+ }
+}
+
+impl<T: Future, S: Schedule> Core<T, S> {
+ /// Polls the future.
///
/// # Safety
///
@@ -230,7 +297,7 @@ impl<T: Future> CoreStage<T> {
/// heap.
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
- self.stage.with_mut(|ptr| {
+ self.stage.stage.with_mut(|ptr| {
// Safety: The caller ensures mutual exclusion to the field.
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
@@ -240,6 +307,7 @@ impl<T: Future> CoreStage<T> {
// Safety: The caller ensures the future is pinned.
let future = unsafe { Pin::new_unchecked(future) };
+ let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
@@ -251,7 +319,7 @@ impl<T: Future> CoreStage<T> {
res
}
- /// Drop the future
+ /// Drops the future.
///
/// # Safety
///
@@ -263,7 +331,7 @@ impl<T: Future> CoreStage<T> {
}
}
- /// Store the task output
+ /// Stores the task output.
///
/// # Safety
///
@@ -275,7 +343,7 @@ impl<T: Future> CoreStage<T> {
}
}
- /// Take the task output
+ /// Takes the task output.
///
/// # Safety
///
@@ -283,7 +351,7 @@ impl<T: Future> CoreStage<T> {
pub(super) fn take_output(&self) -> super::Result<T::Output> {
use std::mem;
- self.stage.with_mut(|ptr| {
+ self.stage.stage.with_mut(|ptr| {
// Safety:: the caller ensures mutual exclusion to the field.
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
Stage::Finished(output) => output,
@@ -293,38 +361,99 @@ impl<T: Future> CoreStage<T> {
}
unsafe fn set_stage(&self, stage: Stage<T>) {
- self.stage.with_mut(|ptr| *ptr = stage)
+ let _guard = TaskIdGuard::enter(self.task_id);
+ self.stage.stage.with_mut(|ptr| *ptr = stage)
}
}
-cfg_rt_multi_thread! {
- impl Header {
- pub(crate) fn shutdown(&self) {
- use crate::runtime::task::RawTask;
+impl Header {
+ pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
+ self.queue_next.with_mut(|ptr| *ptr = next);
+ }
+
+ // safety: The caller must guarantee exclusive access to this field, and
+ // must ensure that the id is either 0 or the id of the OwnedTasks
+ // containing this task.
+ pub(super) unsafe fn set_owner_id(&self, owner: u64) {
+ self.owner_id.with_mut(|ptr| *ptr = owner);
+ }
- let task = unsafe { RawTask::from_raw(self.into()) };
- task.shutdown();
- }
+ pub(super) fn get_owner_id(&self) -> u64 {
+ // safety: If there are concurrent writes, then that write has violated
+ // the safety requirements on `set_owner_id`.
+ unsafe { self.owner_id.with(|ptr| *ptr) }
+ }
- pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
- self.queue_next.with_mut(|ptr| *ptr = next);
- }
+ /// Gets a pointer to the `Trailer` of the task containing this `Header`.
+ ///
+ /// # Safety
+ ///
+ /// The provided raw pointer must point at the header of a task.
+ pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> {
+ let offset = me.as_ref().vtable.trailer_offset;
+ let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>();
+ NonNull::new_unchecked(trailer)
+ }
+
+ /// Gets a pointer to the scheduler of the task containing this `Header`.
+ ///
+ /// # Safety
+ ///
+ /// The provided raw pointer must point at the header of a task.
+ ///
+ /// The generic type S must be set to the correct scheduler type for this
+ /// task.
+ pub(super) unsafe fn get_scheduler<S>(me: NonNull<Header>) -> NonNull<S> {
+ let offset = me.as_ref().vtable.scheduler_offset;
+ let scheduler = me.as_ptr().cast::<u8>().add(offset).cast::<S>();
+ NonNull::new_unchecked(scheduler)
+ }
+
+ /// Gets a pointer to the id of the task containing this `Header`.
+ ///
+ /// # Safety
+ ///
+ /// The provided raw pointer must point at the header of a task.
+ pub(super) unsafe fn get_id_ptr(me: NonNull<Header>) -> NonNull<Id> {
+ let offset = me.as_ref().vtable.id_offset;
+ let id = me.as_ptr().cast::<u8>().add(offset).cast::<Id>();
+ NonNull::new_unchecked(id)
+ }
+
+ /// Gets the id of the task containing this `Header`.
+ ///
+ /// # Safety
+ ///
+ /// The provided raw pointer must point at the header of a task.
+ pub(super) unsafe fn get_id(me: NonNull<Header>) -> Id {
+ let ptr = Header::get_id_ptr(me).as_ptr();
+ *ptr
+ }
+
+ /// Gets the tracing id of the task containing this `Header`.
+ ///
+ /// # Safety
+ ///
+ /// The provided raw pointer must point at the header of a task.
+ #[cfg(all(tokio_unstable, feature = "tracing"))]
+ pub(super) unsafe fn get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id> {
+ me.as_ref().tracing_id.as_ref()
}
}
impl Trailer {
- pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) {
+ pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) {
self.waker.with_mut(|ptr| {
*ptr = waker;
});
}
- pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool {
+ pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool {
self.waker
.with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
}
- pub(crate) fn wake_join(&self) {
+ pub(super) fn wake_join(&self) {
self.waker.with(|ptr| match unsafe { &*ptr } {
Some(waker) => waker.wake_by_ref(),
None => panic!("waker missing"),
diff --git a/vendor/tokio/src/runtime/task/error.rs b/vendor/tokio/src/runtime/task/error.rs
index 177fe65e9..f7ead77b7 100644
--- a/vendor/tokio/src/runtime/task/error.rs
+++ b/vendor/tokio/src/runtime/task/error.rs
@@ -1,39 +1,43 @@
use std::any::Any;
use std::fmt;
use std::io;
-use std::sync::Mutex;
+use super::Id;
+use crate::util::SyncWrapper;
cfg_rt! {
/// Task failed to execute to completion.
pub struct JoinError {
repr: Repr,
+ id: Id,
}
}
enum Repr {
Cancelled,
- Panic(Mutex<Box<dyn Any + Send + 'static>>),
+ Panic(SyncWrapper<Box<dyn Any + Send + 'static>>),
}
impl JoinError {
- pub(crate) fn cancelled() -> JoinError {
+ pub(crate) fn cancelled(id: Id) -> JoinError {
JoinError {
repr: Repr::Cancelled,
+ id,
}
}
- pub(crate) fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
+ pub(crate) fn panic(id: Id, err: Box<dyn Any + Send + 'static>) -> JoinError {
JoinError {
- repr: Repr::Panic(Mutex::new(err)),
+ repr: Repr::Panic(SyncWrapper::new(err)),
+ id,
}
}
- /// Returns true if the error was caused by the task being cancelled
+ /// Returns true if the error was caused by the task being cancelled.
pub fn is_cancelled(&self) -> bool {
matches!(&self.repr, Repr::Cancelled)
}
- /// Returns true if the error was caused by the task panicking
+ /// Returns true if the error was caused by the task panicking.
///
/// # Examples
///
@@ -78,6 +82,7 @@ impl JoinError {
/// }
/// }
/// ```
+ #[track_caller]
pub fn into_panic(self) -> Box<dyn Any + Send + 'static> {
self.try_into_panic()
.expect("`JoinError` reason is not a panic.")
@@ -106,17 +111,32 @@ impl JoinError {
/// ```
pub fn try_into_panic(self) -> Result<Box<dyn Any + Send + 'static>, JoinError> {
match self.repr {
- Repr::Panic(p) => Ok(p.into_inner().expect("Extracting panic from mutex")),
+ Repr::Panic(p) => Ok(p.into_inner()),
_ => Err(self),
}
}
+
+ /// Returns a [task ID] that identifies the task which errored relative to
+ /// other currently spawned tasks.
+ ///
+ /// **Note**: This is an [unstable API][unstable]. The public API of this type
+ /// may break in 1.x releases. See [the documentation on unstable
+ /// features][unstable] for details.
+ ///
+ /// [task ID]: crate::task::Id
+ /// [unstable]: crate#unstable-features
+ #[cfg(tokio_unstable)]
+ #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
+ pub fn id(&self) -> Id {
+ self.id
+ }
}
impl fmt::Display for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
- Repr::Cancelled => write!(fmt, "cancelled"),
- Repr::Panic(_) => write!(fmt, "panic"),
+ Repr::Cancelled => write!(fmt, "task {} was cancelled", self.id),
+ Repr::Panic(_) => write!(fmt, "task {} panicked", self.id),
}
}
}
@@ -124,8 +144,8 @@ impl fmt::Display for JoinError {
impl fmt::Debug for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
- Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
- Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
+ Repr::Cancelled => write!(fmt, "JoinError::Cancelled({:?})", self.id),
+ Repr::Panic(_) => write!(fmt, "JoinError::Panic({:?}, ...)", self.id),
}
}
}
diff --git a/vendor/tokio/src/runtime/task/harness.rs b/vendor/tokio/src/runtime/task/harness.rs
index 8cd649dc7..8e3c3d14f 100644
--- a/vendor/tokio/src/runtime/task/harness.rs
+++ b/vendor/tokio/src/runtime/task/harness.rs
@@ -1,15 +1,16 @@
use crate::future::Future;
-use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer};
-use crate::runtime::task::state::Snapshot;
+use crate::runtime::task::core::{Cell, Core, Header, Trailer};
+use crate::runtime::task::state::{Snapshot, State};
use crate::runtime::task::waker::waker_ref;
-use crate::runtime::task::{JoinError, Notified, Schedule, Task};
+use crate::runtime::task::{JoinError, Notified, RawTask, Schedule, Task};
use std::mem;
+use std::mem::ManuallyDrop;
use std::panic;
use std::ptr::NonNull;
use std::task::{Context, Poll, Waker};
-/// Typed raw task handle
+/// Typed raw task handle.
pub(super) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
}
@@ -25,8 +26,16 @@ where
}
}
+ fn header_ptr(&self) -> NonNull<Header> {
+ self.cell.cast()
+ }
+
fn header(&self) -> &Header {
- unsafe { &self.cell.as_ref().header }
+ unsafe { &*self.header_ptr().as_ptr() }
+ }
+
+ fn state(&self) -> &State {
+ &self.header().state
}
fn trailer(&self) -> &Trailer {
@@ -36,13 +45,91 @@ where
fn core(&self) -> &Core<T, S> {
unsafe { &self.cell.as_ref().core }
}
+}
+
+/// Task operations that can be implemented without being generic over the
+/// scheduler or task. Only one version of these methods should exist in the
+/// final binary.
+impl RawTask {
+ pub(super) fn drop_reference(self) {
+ if self.state().ref_dec() {
+ self.dealloc();
+ }
+ }
+
+ /// This call consumes a ref-count and notifies the task. This will create a
+ /// new Notified and submit it if necessary.
+ ///
+ /// The caller does not need to hold a ref-count besides the one that was
+ /// passed to this call.
+ pub(super) fn wake_by_val(&self) {
+ use super::state::TransitionToNotifiedByVal;
+
+ match self.state().transition_to_notified_by_val() {
+ TransitionToNotifiedByVal::Submit => {
+ // The caller has given us a ref-count, and the transition has
+ // created a new ref-count, so we now hold two. We turn the new
+ // ref-count Notified and pass it to the call to `schedule`.
+ //
+ // The old ref-count is retained for now to ensure that the task
+ // is not dropped during the call to `schedule` if the call
+ // drops the task it was given.
+ self.schedule();
+
+ // Now that we have completed the call to schedule, we can
+ // release our ref-count.
+ self.drop_reference();
+ }
+ TransitionToNotifiedByVal::Dealloc => {
+ self.dealloc();
+ }
+ TransitionToNotifiedByVal::DoNothing => {}
+ }
+ }
+
+ /// This call notifies the task. It will not consume any ref-counts, but the
+ /// caller should hold a ref-count. This will create a new Notified and
+ /// submit it if necessary.
+ pub(super) fn wake_by_ref(&self) {
+ use super::state::TransitionToNotifiedByRef;
+
+ match self.state().transition_to_notified_by_ref() {
+ TransitionToNotifiedByRef::Submit => {
+ // The transition above incremented the ref-count for a new task
+ // and the caller also holds a ref-count. The caller's ref-count
+ // ensures that the task is not destroyed even if the new task
+ // is dropped before `schedule` returns.
+ self.schedule();
+ }
+ TransitionToNotifiedByRef::DoNothing => {}
+ }
+ }
- fn scheduler_view(&self) -> SchedulerView<'_, S> {
- SchedulerView {
- header: self.header(),
- scheduler: &self.core().scheduler,
+ /// Remotely aborts the task.
+ ///
+ /// The caller should hold a ref-count, but we do not consume it.
+ ///
+ /// This is similar to `shutdown` except that it asks the runtime to perform
+ /// the shutdown. This is necessary to avoid the shutdown happening in the
+ /// wrong thread for non-Send tasks.
+ pub(super) fn remote_abort(&self) {
+ if self.state().transition_to_notified_and_cancel() {
+ // The transition has created a new ref-count, which we turn into
+ // a Notified and pass to the task.
+ //
+ // Since the caller holds a ref-count, the task cannot be destroyed
+ // before the call to `schedule` returns even if the call drops the
+ // `Notified` internally.
+ self.schedule();
}
}
+
+ /// Try to set the waker notified when the task is complete. Returns true if
+ /// the task has already completed. If this call returns false, then the
+ /// waker will not be notified.
+ pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
+ can_read_output(self.header(), self.trailer(), waker)
+ }
}
impl<T, S> Harness<T, S>
@@ -50,43 +137,109 @@ where
T: Future,
S: Schedule,
{
- /// Polls the inner future.
+ pub(super) fn drop_reference(self) {
+ if self.state().ref_dec() {
+ self.dealloc();
+ }
+ }
+
+ /// Polls the inner future. A ref-count is consumed.
///
/// All necessary state checks and transitions are performed.
- ///
/// Panics raised while polling the future are handled.
pub(super) fn poll(self) {
+ // We pass our ref-count to `poll_inner`.
match self.poll_inner() {
PollFuture::Notified => {
- // Signal yield
- self.core().scheduler.yield_now(Notified(self.to_task()));
- // The ref-count was incremented as part of
- // `transition_to_idle`.
+ // The `poll_inner` call has given us two ref-counts back.
+ // We give one of them to a new task and call `yield_now`.
+ self.core()
+ .scheduler
+ .yield_now(Notified(self.get_new_task()));
+
+ // The remaining ref-count is now dropped. We kept the extra
+ // ref-count until now to ensure that even if the `yield_now`
+ // call drops the provided task, the task isn't deallocated
+ // before after `yield_now` returns.
self.drop_reference();
}
- PollFuture::DropReference => {
- self.drop_reference();
+ PollFuture::Complete => {
+ self.complete();
}
- PollFuture::Complete(out, is_join_interested) => {
- self.complete(out, is_join_interested);
+ PollFuture::Dealloc => {
+ self.dealloc();
}
- PollFuture::None => (),
+ PollFuture::Done => (),
}
}
- fn poll_inner(&self) -> PollFuture<T::Output> {
- let snapshot = match self.scheduler_view().transition_to_running() {
- TransitionToRunning::Ok(snapshot) => snapshot,
- TransitionToRunning::DropReference => return PollFuture::DropReference,
- };
+ /// Polls the task and cancel it if necessary. This takes ownership of a
+ /// ref-count.
+ ///
+ /// If the return value is Notified, the caller is given ownership of two
+ /// ref-counts.
+ ///
+ /// If the return value is Complete, the caller is given ownership of a
+ /// single ref-count, which should be passed on to `complete`.
+ ///
+ /// If the return value is Dealloc, then this call consumed the last
+ /// ref-count and the caller should call `dealloc`.
+ ///
+ /// Otherwise the ref-count is consumed and the caller should not access
+ /// `self` again.
+ fn poll_inner(&self) -> PollFuture {
+ use super::state::{TransitionToIdle, TransitionToRunning};
+
+ match self.state().transition_to_running() {
+ TransitionToRunning::Success => {
+ let header_ptr = self.header_ptr();
+ let waker_ref = waker_ref::<T, S>(&header_ptr);
+ let cx = Context::from_waker(&waker_ref);
+ let res = poll_future(self.core(), cx);
+
+ if res == Poll::Ready(()) {
+ // The future completed. Move on to complete the task.
+ return PollFuture::Complete;
+ }
- // The transition to `Running` done above ensures that a lock on the
- // future has been obtained. This also ensures the `*mut T` pointer
- // contains the future (as opposed to the output) and is initialized.
+ match self.state().transition_to_idle() {
+ TransitionToIdle::Ok => PollFuture::Done,
+ TransitionToIdle::OkNotified => PollFuture::Notified,
+ TransitionToIdle::OkDealloc => PollFuture::Dealloc,
+ TransitionToIdle::Cancelled => {
+ // The transition to idle failed because the task was
+ // cancelled during the poll.
+ cancel_task(self.core());
+ PollFuture::Complete
+ }
+ }
+ }
+ TransitionToRunning::Cancelled => {
+ cancel_task(self.core());
+ PollFuture::Complete
+ }
+ TransitionToRunning::Failed => PollFuture::Done,
+ TransitionToRunning::Dealloc => PollFuture::Dealloc,
+ }
+ }
- let waker_ref = waker_ref::<T, S>(self.header());
- let cx = Context::from_waker(&*waker_ref);
- poll_future(self.header(), &self.core().stage, snapshot, cx)
+ /// Forcibly shuts down the task.
+ ///
+ /// Attempt to transition to `Running` in order to forcibly shutdown the
+ /// task. If the task is currently running or in a state of completion, then
+ /// there is nothing further to do. When the task completes running, it will
+ /// notice the `CANCELLED` bit and finalize the task.
+ pub(super) fn shutdown(self) {
+ if !self.state().transition_to_shutdown() {
+ // The task is concurrently running. No further work needed.
+ self.drop_reference();
+ return;
+ }
+
+ // By transitioning the lifecycle to `Running`, we have permission to
+ // drop the future.
+ cancel_task(self.core());
+ self.complete();
}
pub(super) fn dealloc(self) {
@@ -95,8 +248,20 @@ where
// Check causality
self.core().stage.with_mut(drop);
- self.core().scheduler.with_mut(drop);
+ // Safety: The caller of this method just transitioned our ref-count to
+ // zero, so it is our responsibility to release the allocation.
+ //
+ // We don't hold any references into the allocation at this point, but
+ // it is possible for another thread to still hold a `&State` into the
+ // allocation if that other thread has decremented its last ref-count,
+ // but has not yet returned from the relevant method on `State`.
+ //
+ // However, the `State` type consists of just an `AtomicUsize`, and an
+ // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
+ // As explained in the documentation for `UnsafeCell`, such references
+ // are allowed to be dangling after their last use, even if the
+ // reference has not yet gone out of scope.
unsafe {
drop(Box::from_raw(self.cell.as_ptr()));
}
@@ -107,227 +272,92 @@ where
/// Read the task output into `dst`.
pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
if can_read_output(self.header(), self.trailer(), waker) {
- *dst = Poll::Ready(self.core().stage.take_output());
+ *dst = Poll::Ready(self.core().take_output());
}
}
pub(super) fn drop_join_handle_slow(self) {
- let mut maybe_panic = None;
-
// Try to unset `JOIN_INTEREST`. This must be done as a first step in
// case the task concurrently completed.
- if self.header().state.unset_join_interested().is_err() {
+ if self.state().unset_join_interested().is_err() {
// It is our responsibility to drop the output. This is critical as
// the task output may not be `Send` and as such must remain with
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
- let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- self.core().stage.drop_future_or_output();
+ //
+ // Panics are delivered to the user via the `JoinHandle`. Given that
+ // they are dropping the `JoinHandle`, we assume they are not
+ // interested in the panic and swallow it.
+ let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ self.core().drop_future_or_output();
}));
- if let Err(panic) = panic {
- maybe_panic = Some(panic);
- }
}
// Drop the `JoinHandle` reference, possibly deallocating the task
self.drop_reference();
-
- if let Some(panic) = maybe_panic {
- panic::resume_unwind(panic);
- }
- }
-
- // ===== waker behavior =====
-
- pub(super) fn wake_by_val(self) {
- self.wake_by_ref();
- self.drop_reference();
- }
-
- pub(super) fn wake_by_ref(&self) {
- if self.header().state.transition_to_notified() {
- self.core().scheduler.schedule(Notified(self.to_task()));
- }
- }
-
- pub(super) fn drop_reference(self) {
- if self.header().state.ref_dec() {
- self.dealloc();
- }
}
- #[cfg(all(tokio_unstable, feature = "tracing"))]
- pub(super) fn id(&self) -> Option<&tracing::Id> {
- self.header().id.as_ref()
- }
-
- /// Forcibly shutdown the task
- ///
- /// Attempt to transition to `Running` in order to forcibly shutdown the
- /// task. If the task is currently running or in a state of completion, then
- /// there is nothing further to do. When the task completes running, it will
- /// notice the `CANCELLED` bit and finalize the task.
- pub(super) fn shutdown(self) {
- if !self.header().state.transition_to_shutdown() {
- // The task is concurrently running. No further work needed.
- return;
- }
-
- // By transitioning the lifecycle to `Running`, we have permission to
- // drop the future.
- let err = cancel_task(&self.core().stage);
- self.complete(Err(err), true)
- }
+ // ====== internal ======
- /// Remotely abort the task
- ///
- /// This is similar to `shutdown` except that it asks the runtime to perform
- /// the shutdown. This is necessary to avoid the shutdown happening in the
- /// wrong thread for non-Send tasks.
- pub(super) fn remote_abort(self) {
- if self.header().state.transition_to_notified_and_cancel() {
- self.core().scheduler.schedule(Notified(self.to_task()));
- }
- }
+ /// Completes the task. This method assumes that the state is RUNNING.
+ fn complete(self) {
+ // The future has completed and its output has been written to the task
+ // stage. We transition from running to complete.
- // ====== internal ======
+ let snapshot = self.state().transition_to_complete();
- fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
- // We catch panics here because dropping the output may panic.
- //
- // Dropping the output can also happen in the first branch inside
- // transition_to_complete.
+ // We catch panics here in case dropping the future or waking the
+ // JoinHandle panics.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- if is_join_interested {
- // Store the output. The future has already been dropped
- //
- // Safety: Mutual exclusion is obtained by having transitioned the task
- // state -> Running
- let stage = &self.core().stage;
- stage.store_output(output);
-
- // Transition to `Complete`, notifying the `JoinHandle` if necessary.
- transition_to_complete(self.header(), stage, &self.trailer());
- } else {
- drop(output);
+ if !snapshot.is_join_interested() {
+ // The `JoinHandle` is not interested in the output of
+ // this task. It is our responsibility to drop the
+ // output.
+ self.core().drop_future_or_output();
+ } else if snapshot.is_join_waker_set() {
+ // Notify the waker. Reading the waker field is safe per rule 4
+ // in task/mod.rs, since the JOIN_WAKER bit is set and the call
+ // to transition_to_complete() above set the COMPLETE bit.
+ self.trailer().wake_join();
}
}));
// The task has completed execution and will no longer be scheduled.
- //
- // Attempts to batch a ref-dec with the state transition below.
+ let num_release = self.release();
- if self
- .scheduler_view()
- .transition_to_terminal(is_join_interested)
- {
- self.dealloc()
+ if self.state().transition_to_terminal(num_release) {
+ self.dealloc();
}
}
- fn to_task(&self) -> Task<S> {
- self.scheduler_view().to_task()
- }
-}
+ /// Releases the task from the scheduler. Returns the number of ref-counts
+ /// that should be decremented.
+ fn release(&self) -> usize {
+ // We don't actually increment the ref-count here, but the new task is
+ // never destroyed, so that's ok.
+ let me = ManuallyDrop::new(self.get_new_task());
-enum TransitionToRunning {
- Ok(Snapshot),
- DropReference,
-}
-
-struct SchedulerView<'a, S> {
- header: &'a Header,
- scheduler: &'a Scheduler<S>,
-}
-
-impl<'a, S> SchedulerView<'a, S>
-where
- S: Schedule,
-{
- fn to_task(&self) -> Task<S> {
- // SAFETY The header is from the same struct containing the scheduler `S` so the cast is safe
- unsafe { Task::from_raw(self.header.into()) }
- }
-
- /// Returns true if the task should be deallocated.
- fn transition_to_terminal(&self, is_join_interested: bool) -> bool {
- let ref_dec = if self.scheduler.is_bound() {
- if let Some(task) = self.scheduler.release(self.to_task()) {
- mem::forget(task);
- true
- } else {
- false
- }
+ if let Some(task) = self.core().scheduler.release(&me) {
+ mem::forget(task);
+ 2
} else {
- false
- };
-
- // This might deallocate
- let snapshot = self
- .header
- .state
- .transition_to_terminal(!is_join_interested, ref_dec);
-
- snapshot.ref_count() == 0
- }
-
- fn transition_to_running(&self) -> TransitionToRunning {
- // If this is the first time the task is polled, the task will be bound
- // to the scheduler, in which case the task ref count must be
- // incremented.
- let is_not_bound = !self.scheduler.is_bound();
-
- // Transition the task to the running state.
- //
- // A failure to transition here indicates the task has been cancelled
- // while in the run queue pending execution.
- let snapshot = match self.header.state.transition_to_running(is_not_bound) {
- Ok(snapshot) => snapshot,
- Err(_) => {
- // The task was shutdown while in the run queue. At this point,
- // we just hold a ref counted reference. Since we do not have access to it here
- // return `DropReference` so the caller drops it.
- return TransitionToRunning::DropReference;
- }
- };
-
- if is_not_bound {
- // Ensure the task is bound to a scheduler instance. Since this is
- // the first time polling the task, a scheduler instance is pulled
- // from the local context and assigned to the task.
- //
- // The scheduler maintains ownership of the task and responds to
- // `wake` calls.
- //
- // The task reference count has been incremented.
- //
- // Safety: Since we have unique access to the task so that we can
- // safely call `bind_scheduler`.
- self.scheduler.bind_scheduler(self.to_task());
+ 1
}
- TransitionToRunning::Ok(snapshot)
}
-}
-/// Transitions the task's lifecycle to `Complete`. Notifies the
-/// `JoinHandle` if it still has interest in the completion.
-fn transition_to_complete<T>(header: &Header, stage: &CoreStage<T>, trailer: &Trailer)
-where
- T: Future,
-{
- // Transition the task's lifecycle to `Complete` and get a snapshot of
- // the task's sate.
- let snapshot = header.state.transition_to_complete();
-
- if !snapshot.is_join_interested() {
- // The `JoinHandle` is not interested in the output of this task. It
- // is our responsibility to drop the output.
- stage.drop_future_or_output();
- } else if snapshot.has_join_waker() {
- // Notify the join handle. The previous transition obtains the
- // lock on the waker cell.
- trailer.wake_join();
+ /// Creates a new task that holds its own ref-count.
+ ///
+ /// # Safety
+ ///
+ /// Any use of `self` after this call must ensure that a ref-count to the
+ /// task holds the task alive until after the use of `self`. Passing the
+ /// returned Task to any method on `self` is unsound if dropping the Task
+ /// could drop `self` before the call on `self` returned.
+ fn get_new_task(&self) -> Task<S> {
+ // safety: The header is at the beginning of the cell, so this cast is
+ // safe.
+ unsafe { Task::from_raw(self.cell.cast()) }
}
}
@@ -338,36 +368,30 @@ fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
debug_assert!(snapshot.is_join_interested());
if !snapshot.is_complete() {
- // The waker must be stored in the task struct.
- let res = if snapshot.has_join_waker() {
- // There already is a waker stored in the struct. If it matches
- // the provided waker, then there is no further work to do.
- // Otherwise, the waker must be swapped.
- let will_wake = unsafe {
- // Safety: when `JOIN_INTEREST` is set, only `JOIN_HANDLE`
- // may mutate the `waker` field.
- trailer.will_wake(waker)
- };
-
- if will_wake {
- // The task is not complete **and** the waker is up to date,
- // there is nothing further that needs to be done.
+ // If the task is not complete, try storing the provided waker in the
+ // task's waker field.
+
+ let res = if snapshot.is_join_waker_set() {
+ // If JOIN_WAKER is set, then JoinHandle has previously stored a
+ // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
+
+ // Optimization: if the stored waker and the provided waker wake the
+ // same task, then return without touching the waker field. (Reading
+ // the waker field below is safe per rule 3 in task/mod.rs.)
+ if unsafe { trailer.will_wake(waker) } {
return false;
}
- // Unset the `JOIN_WAKER` to gain mutable access to the `waker`
- // field then update the field with the new join worker.
- //
- // This requires two atomic operations, unsetting the bit and
- // then resetting it. If the task transitions to complete
- // concurrently to either one of those operations, then setting
- // the join waker fails and we proceed to reading the task
- // output.
+ // Otherwise swap the stored waker with the provided waker by
+ // following the rule 5 in task/mod.rs.
header
.state
.unset_waker()
.and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
} else {
+ // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
+ // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
+ // of rule 5 and try to store the provided waker in the waker field.
set_join_waker(header, trailer, waker.clone(), snapshot)
};
@@ -388,7 +412,7 @@ fn set_join_waker(
snapshot: Snapshot,
) -> Result<Snapshot, Snapshot> {
assert!(snapshot.is_join_interested());
- assert!(!snapshot.has_join_waker());
+ assert!(!snapshot.is_join_waker_set());
// Safety: Only the `JoinHandle` may set the `waker` field. When
// `JOIN_INTEREST` is **not** set, nothing else will touch the field.
@@ -409,73 +433,69 @@ fn set_join_waker(
res
}
-enum PollFuture<T> {
- Complete(Result<T, JoinError>, bool),
- DropReference,
+enum PollFuture {
+ Complete,
Notified,
- None,
+ Done,
+ Dealloc,
}
-fn cancel_task<T: Future>(stage: &CoreStage<T>) -> JoinError {
+/// Cancels the task and store the appropriate error in the stage field.
+fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
// Drop the future from a panic guard.
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- stage.drop_future_or_output();
+ core.drop_future_or_output();
}));
- if let Err(err) = res {
- // Dropping the future panicked, complete the join
- // handle with the panic to avoid dropping the panic
- // on the ground.
- JoinError::panic(err)
- } else {
- JoinError::cancelled()
+ match res {
+ Ok(()) => {
+ core.store_output(Err(JoinError::cancelled(core.task_id)));
+ }
+ Err(panic) => {
+ core.store_output(Err(JoinError::panic(core.task_id, panic)));
+ }
}
}
-fn poll_future<T: Future>(
- header: &Header,
- core: &CoreStage<T>,
- snapshot: Snapshot,
- cx: Context<'_>,
-) -> PollFuture<T::Output> {
- if snapshot.is_cancelled() {
- PollFuture::Complete(Err(cancel_task(core)), snapshot.is_join_interested())
- } else {
- let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
- struct Guard<'a, T: Future> {
- core: &'a CoreStage<T>,
- }
-
- impl<T: Future> Drop for Guard<'_, T> {
- fn drop(&mut self) {
- self.core.drop_future_or_output();
- }
+/// Polls the future. If the future completes, the output is written to the
+/// stage field.
+fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
+ // Poll the future.
+ let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ struct Guard<'a, T: Future, S: Schedule> {
+ core: &'a Core<T, S>,
+ }
+ impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
+ fn drop(&mut self) {
+ // If the future panics on poll, we drop it inside the panic
+ // guard.
+ self.core.drop_future_or_output();
}
+ }
+ let guard = Guard { core };
+ let res = guard.core.poll(cx);
+ mem::forget(guard);
+ res
+ }));
- let guard = Guard { core };
-
- let res = guard.core.poll(cx);
+ // Prepare output for being placed in the core stage.
+ let output = match output {
+ Ok(Poll::Pending) => return Poll::Pending,
+ Ok(Poll::Ready(output)) => Ok(output),
+ Err(panic) => {
+ core.scheduler.unhandled_panic();
+ Err(JoinError::panic(core.task_id, panic))
+ }
+ };
- // prevent the guard from dropping the future
- mem::forget(guard);
+ // Catch and ignore panics if the future panics on drop.
+ let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
+ core.store_output(output);
+ }));
- res
- }));
- match res {
- Ok(Poll::Pending) => match header.state.transition_to_idle() {
- Ok(snapshot) => {
- if snapshot.is_notified() {
- PollFuture::Notified
- } else {
- PollFuture::None
- }
- }
- Err(_) => PollFuture::Complete(Err(cancel_task(core)), true),
- },
- Ok(Poll::Ready(ok)) => PollFuture::Complete(Ok(ok), snapshot.is_join_interested()),
- Err(err) => {
- PollFuture::Complete(Err(JoinError::panic(err)), snapshot.is_join_interested())
- }
- }
+ if res.is_err() {
+ core.scheduler.unhandled_panic();
}
+
+ Poll::Ready(())
}
diff --git a/vendor/tokio/src/runtime/task/id.rs b/vendor/tokio/src/runtime/task/id.rs
new file mode 100644
index 000000000..2b0d95c02
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/id.rs
@@ -0,0 +1,87 @@
+use crate::runtime::context;
+
+use std::fmt;
+
+/// An opaque ID that uniquely identifies a task relative to all other currently
+/// running tasks.
+///
+/// # Notes
+///
+/// - Task IDs are unique relative to other *currently running* tasks. When a
+/// task completes, the same ID may be used for another task.
+/// - Task IDs are *not* sequential, and do not indicate the order in which
+/// tasks are spawned, what runtime a task is spawned on, or any other data.
+/// - The task ID of the currently running task can be obtained from inside the
+/// task via the [`task::try_id()`](crate::task::try_id()) and
+/// [`task::id()`](crate::task::id()) functions and from outside the task via
+/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [unstable]: crate#unstable-features
+#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)]
+pub struct Id(u64);
+
+/// Returns the [`Id`] of the currently running task.
+///
+/// # Panics
+///
+/// This function panics if called from outside a task. Please note that calls
+/// to `block_on` do not have task IDs, so the method will panic if called from
+/// within a call to `block_on`. For a version of this function that doesn't
+/// panic, see [`task::try_id()`](crate::runtime::task::try_id()).
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn id() -> Id {
+ context::current_task_id().expect("Can't get a task id when not inside a task")
+}
+
+/// Returns the [`Id`] of the currently running task, or `None` if called outside
+/// of a task.
+///
+/// This function is similar to [`task::id()`](crate::runtime::task::id()), except
+/// that it returns `None` rather than panicking if called outside of a task
+/// context.
+///
+/// **Note**: This is an [unstable API][unstable]. The public API of this type
+/// may break in 1.x releases. See [the documentation on unstable
+/// features][unstable] for details.
+///
+/// [task ID]: crate::task::Id
+/// [unstable]: crate#unstable-features
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+#[track_caller]
+pub fn try_id() -> Option<Id> {
+ context::current_task_id()
+}
+
+impl fmt::Display for Id {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
+impl Id {
+ pub(crate) fn next() -> Self {
+ use crate::loom::sync::atomic::{Ordering::Relaxed, StaticAtomicU64};
+
+ static NEXT_ID: StaticAtomicU64 = StaticAtomicU64::new(1);
+
+ Self(NEXT_ID.fetch_add(1, Relaxed))
+ }
+
+ pub(crate) fn as_u64(&self) -> u64 {
+ self.0
+ }
+}
diff --git a/vendor/tokio/src/runtime/task/join.rs b/vendor/tokio/src/runtime/task/join.rs
index 2fe40a721..ee3925884 100644
--- a/vendor/tokio/src/runtime/task/join.rs
+++ b/vendor/tokio/src/runtime/task/join.rs
@@ -1,16 +1,19 @@
-use crate::runtime::task::RawTask;
+use crate::runtime::task::{Header, RawTask};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
+use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
-use std::task::{Context, Poll};
+use std::task::{Context, Poll, Waker};
cfg_rt! {
/// An owned permission to join on a task (await its termination).
///
- /// This can be thought of as the equivalent of [`std::thread::JoinHandle`] for
- /// a task rather than a thread.
+ /// This can be thought of as the equivalent of [`std::thread::JoinHandle`]
+ /// for a Tokio task rather than a thread. Note that the background task
+ /// associated with this `JoinHandle` started running immediately when you
+ /// called spawn, even if you have not yet awaited the `JoinHandle`.
///
/// A `JoinHandle` *detaches* the associated task when it is dropped, which
/// means that there is no longer any handle to the task, and no way to `join`
@@ -19,6 +22,15 @@ cfg_rt! {
/// This `struct` is created by the [`task::spawn`] and [`task::spawn_blocking`]
/// functions.
///
+ /// # Cancel safety
+ ///
+ /// The `&mut JoinHandle<T>` type is cancel safe. If it is used as the event
+ /// in a `tokio::select!` statement and some other branch completes first,
+ /// then it is guaranteed that the output of the task is not lost.
+ ///
+ /// If a `JoinHandle` is dropped, then the task continues running in the
+ /// background and its return value is lost.
+ ///
/// # Examples
///
/// Creation from [`task::spawn`]:
@@ -142,7 +154,7 @@ cfg_rt! {
/// [`std::thread::JoinHandle`]: std::thread::JoinHandle
/// [`JoinError`]: crate::task::JoinError
pub struct JoinHandle<T> {
- raw: Option<RawTask>,
+ raw: RawTask,
_p: PhantomData<T>,
}
}
@@ -150,10 +162,13 @@ cfg_rt! {
unsafe impl<T: Send> Send for JoinHandle<T> {}
unsafe impl<T: Send> Sync for JoinHandle<T> {}
+impl<T> UnwindSafe for JoinHandle<T> {}
+impl<T> RefUnwindSafe for JoinHandle<T> {}
+
impl<T> JoinHandle<T> {
pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
JoinHandle {
- raw: Some(raw),
+ raw,
_p: PhantomData,
}
}
@@ -162,39 +177,134 @@ impl<T> JoinHandle<T> {
///
/// Awaiting a cancelled task might complete as usual if the task was
/// already completed at the time it was cancelled, but most likely it
- /// will complete with a `Err(JoinError::Cancelled)`.
+ /// will fail with a [cancelled] `JoinError`.
///
/// ```rust
/// use tokio::time;
///
- /// #[tokio::main]
- /// async fn main() {
- /// let mut handles = Vec::new();
- ///
- /// handles.push(tokio::spawn(async {
- /// time::sleep(time::Duration::from_secs(10)).await;
- /// true
- /// }));
- ///
- /// handles.push(tokio::spawn(async {
- /// time::sleep(time::Duration::from_secs(10)).await;
- /// false
- /// }));
- ///
- /// for handle in &handles {
- /// handle.abort();
- /// }
- ///
- /// for handle in handles {
- /// assert!(handle.await.unwrap_err().is_cancelled());
- /// }
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// let mut handles = Vec::new();
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// true
+ /// }));
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// false
+ /// }));
+ ///
+ /// for handle in &handles {
+ /// handle.abort();
+ /// }
+ ///
+ /// for handle in handles {
+ /// assert!(handle.await.unwrap_err().is_cancelled());
/// }
+ /// # }
/// ```
+ /// [cancelled]: method@super::error::JoinError::is_cancelled
pub fn abort(&self) {
- if let Some(raw) = self.raw {
- raw.remote_abort();
+ self.raw.remote_abort();
+ }
+
+ /// Checks if the task associated with this `JoinHandle` has finished.
+ ///
+ /// Please note that this method can return `false` even if [`abort`] has been
+ /// called on the task. This is because the cancellation process may take
+ /// some time, and this method does not return `true` until it has
+ /// completed.
+ ///
+ /// ```rust
+ /// use tokio::time;
+ ///
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// let handle1 = tokio::spawn(async {
+ /// // do some stuff here
+ /// });
+ /// let handle2 = tokio::spawn(async {
+ /// // do some other stuff here
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// });
+ /// // Wait for the task to finish
+ /// handle2.abort();
+ /// time::sleep(time::Duration::from_secs(1)).await;
+ /// assert!(handle1.is_finished());
+ /// assert!(handle2.is_finished());
+ /// # }
+ /// ```
+ /// [`abort`]: method@JoinHandle::abort
+ pub fn is_finished(&self) -> bool {
+ let state = self.raw.header().state.load();
+ state.is_complete()
+ }
+
+ /// Set the waker that is notified when the task completes.
+ pub(crate) fn set_join_waker(&mut self, waker: &Waker) {
+ if self.raw.try_set_join_waker(waker) {
+ // In this case the task has already completed. We wake the waker immediately.
+ waker.wake_by_ref();
}
}
+
+ /// Returns a new `AbortHandle` that can be used to remotely abort this task.
+ ///
+ /// Awaiting a task cancelled by the `AbortHandle` might complete as usual if the task was
+ /// already completed at the time it was cancelled, but most likely it
+ /// will fail with a [cancelled] `JoinError`.
+ ///
+ /// ```rust
+ /// use tokio::{time, task};
+ ///
+ /// # #[tokio::main(flavor = "current_thread", start_paused = true)]
+ /// # async fn main() {
+ /// let mut handles = Vec::new();
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// true
+ /// }));
+ ///
+ /// handles.push(tokio::spawn(async {
+ /// time::sleep(time::Duration::from_secs(10)).await;
+ /// false
+ /// }));
+ ///
+ /// let abort_handles: Vec<task::AbortHandle> = handles.iter().map(|h| h.abort_handle()).collect();
+ ///
+ /// for handle in abort_handles {
+ /// handle.abort();
+ /// }
+ ///
+ /// for handle in handles {
+ /// assert!(handle.await.unwrap_err().is_cancelled());
+ /// }
+ /// # }
+ /// ```
+ /// [cancelled]: method@super::error::JoinError::is_cancelled
+ pub fn abort_handle(&self) -> super::AbortHandle {
+ self.raw.ref_inc();
+ super::AbortHandle::new(self.raw)
+ }
+
+ /// Returns a [task ID] that uniquely identifies this task relative to other
+ /// currently spawned tasks.
+ ///
+ /// **Note**: This is an [unstable API][unstable]. The public API of this type
+ /// may break in 1.x releases. See [the documentation on unstable
+ /// features][unstable] for details.
+ ///
+ /// [task ID]: crate::task::Id
+ /// [unstable]: crate#unstable-features
+ #[cfg(tokio_unstable)]
+ #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
+ pub fn id(&self) -> super::Id {
+ // Safety: The header pointer is valid.
+ unsafe { Header::get_id(self.raw.header_ptr()) }
+ }
}
impl<T> Unpin for JoinHandle<T> {}
@@ -203,17 +313,11 @@ impl<T> Future for JoinHandle<T> {
type Output = super::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ ready!(crate::trace::trace_leaf(cx));
let mut ret = Poll::Pending;
// Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- // Raw should always be set. If it is not, this is due to polling after
- // completion
- let raw = self
- .raw
- .as_ref()
- .expect("polling after `JoinHandle` already completed");
+ let coop = ready!(crate::runtime::coop::poll_proceed(cx));
// Try to read the task output. If the task is not yet complete, the
// waker is stored and is notified once the task does complete.
@@ -227,7 +331,8 @@ impl<T> Future for JoinHandle<T> {
//
// The type of `T` must match the task's output type.
unsafe {
- raw.try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
+ self.raw
+ .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
}
if ret.is_ready() {
@@ -240,13 +345,11 @@ impl<T> Future for JoinHandle<T> {
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
- if let Some(raw) = self.raw.take() {
- if raw.header().state.drop_join_handle_fast().is_ok() {
- return;
- }
-
- raw.drop_join_handle_slow();
+ if self.raw.state().drop_join_handle_fast().is_ok() {
+ return;
}
+
+ self.raw.drop_join_handle_slow();
}
}
@@ -255,6 +358,9 @@ where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("JoinHandle").finish()
+ // Safety: The header pointer is valid.
+ let id_ptr = unsafe { Header::get_id_ptr(self.raw.header_ptr()) };
+ let id = unsafe { id_ptr.as_ref() };
+ fmt.debug_struct("JoinHandle").field("id", id).finish()
}
}
diff --git a/vendor/tokio/src/runtime/task/list.rs b/vendor/tokio/src/runtime/task/list.rs
new file mode 100644
index 000000000..fb7dbdc1d
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/list.rs
@@ -0,0 +1,319 @@
+//! This module has containers for storing the tasks spawned on a scheduler. The
+//! `OwnedTasks` container is thread-safe but can only store tasks that
+//! implement Send. The `LocalOwnedTasks` container is not thread safe, but can
+//! store non-Send tasks.
+//!
+//! The collections can be closed to prevent adding new tasks during shutdown of
+//! the scheduler with the collection.
+
+use crate::future::Future;
+use crate::loom::cell::UnsafeCell;
+use crate::loom::sync::Mutex;
+use crate::runtime::task::{JoinHandle, LocalNotified, Notified, Schedule, Task};
+use crate::util::linked_list::{CountedLinkedList, Link, LinkedList};
+
+use std::marker::PhantomData;
+
+// The id from the module below is used to verify whether a given task is stored
+// in this OwnedTasks, or some other task. The counter starts at one so we can
+// use zero for tasks not owned by any list.
+//
+// The safety checks in this file can technically be violated if the counter is
+// overflown, but the checks are not supposed to ever fail unless there is a
+// bug in Tokio, so we accept that certain bugs would not be caught if the two
+// mixed up runtimes happen to have the same id.
+
+cfg_has_atomic_u64! {
+ use std::sync::atomic::{AtomicU64, Ordering};
+
+ static NEXT_OWNED_TASKS_ID: AtomicU64 = AtomicU64::new(1);
+
+ fn get_next_id() -> u64 {
+ loop {
+ let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
+ if id != 0 {
+ return id;
+ }
+ }
+ }
+}
+
+cfg_not_has_atomic_u64! {
+ use std::sync::atomic::{AtomicU32, Ordering};
+
+ static NEXT_OWNED_TASKS_ID: AtomicU32 = AtomicU32::new(1);
+
+ fn get_next_id() -> u64 {
+ loop {
+ let id = NEXT_OWNED_TASKS_ID.fetch_add(1, Ordering::Relaxed);
+ if id != 0 {
+ return u64::from(id);
+ }
+ }
+ }
+}
+
+pub(crate) struct OwnedTasks<S: 'static> {
+ inner: Mutex<CountedOwnedTasksInner<S>>,
+ id: u64,
+}
+struct CountedOwnedTasksInner<S: 'static> {
+ list: CountedLinkedList<Task<S>, <Task<S> as Link>::Target>,
+ closed: bool,
+}
+pub(crate) struct LocalOwnedTasks<S: 'static> {
+ inner: UnsafeCell<OwnedTasksInner<S>>,
+ id: u64,
+ _not_send_or_sync: PhantomData<*const ()>,
+}
+struct OwnedTasksInner<S: 'static> {
+ list: LinkedList<Task<S>, <Task<S> as Link>::Target>,
+ closed: bool,
+}
+
+impl<S: 'static> OwnedTasks<S> {
+ pub(crate) fn new() -> Self {
+ Self {
+ inner: Mutex::new(CountedOwnedTasksInner {
+ list: CountedLinkedList::new(),
+ closed: false,
+ }),
+ id: get_next_id(),
+ }
+ }
+
+ /// Binds the provided task to this OwnedTasks instance. This fails if the
+ /// OwnedTasks has been closed.
+ pub(crate) fn bind<T>(
+ &self,
+ task: T,
+ scheduler: S,
+ id: super::Id,
+ ) -> (JoinHandle<T::Output>, Option<Notified<S>>)
+ where
+ S: Schedule,
+ T: Future + Send + 'static,
+ T::Output: Send + 'static,
+ {
+ let (task, notified, join) = super::new_task(task, scheduler, id);
+
+ unsafe {
+ // safety: We just created the task, so we have exclusive access
+ // to the field.
+ task.header().set_owner_id(self.id);
+ }
+
+ let mut lock = self.inner.lock();
+ if lock.closed {
+ drop(lock);
+ drop(notified);
+ task.shutdown();
+ (join, None)
+ } else {
+ lock.list.push_front(task);
+ (join, Some(notified))
+ }
+ }
+
+ /// Asserts that the given task is owned by this OwnedTasks and convert it to
+ /// a LocalNotified, giving the thread permission to poll this task.
+ #[inline]
+ pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
+ assert_eq!(task.header().get_owner_id(), self.id);
+
+ // safety: All tasks bound to this OwnedTasks are Send, so it is safe
+ // to poll it on this thread no matter what thread we are on.
+ LocalNotified {
+ task: task.0,
+ _not_send: PhantomData,
+ }
+ }
+
+ /// Shuts down all tasks in the collection. This call also closes the
+ /// collection, preventing new items from being added.
+ pub(crate) fn close_and_shutdown_all(&self)
+ where
+ S: Schedule,
+ {
+ // The first iteration of the loop was unrolled so it can set the
+ // closed bool.
+ let first_task = {
+ let mut lock = self.inner.lock();
+ lock.closed = true;
+ lock.list.pop_back()
+ };
+ match first_task {
+ Some(task) => task.shutdown(),
+ None => return,
+ }
+
+ loop {
+ let task = match self.inner.lock().list.pop_back() {
+ Some(task) => task,
+ None => return,
+ };
+
+ task.shutdown();
+ }
+ }
+
+ pub(crate) fn active_tasks_count(&self) -> usize {
+ self.inner.lock().list.count()
+ }
+
+ pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
+ let task_id = task.header().get_owner_id();
+ if task_id == 0 {
+ // The task is unowned.
+ return None;
+ }
+
+ assert_eq!(task_id, self.id);
+
+ // safety: We just checked that the provided task is not in some other
+ // linked list.
+ unsafe { self.inner.lock().list.remove(task.header_ptr()) }
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.inner.lock().list.is_empty()
+ }
+}
+
+cfg_taskdump! {
+ impl<S: 'static> OwnedTasks<S> {
+ /// Locks the tasks, and calls `f` on an iterator over them.
+ pub(crate) fn for_each<F>(&self, f: F)
+ where
+ F: FnMut(&Task<S>)
+ {
+ self.inner.lock().list.for_each(f)
+ }
+ }
+}
+
+impl<S: 'static> LocalOwnedTasks<S> {
+ pub(crate) fn new() -> Self {
+ Self {
+ inner: UnsafeCell::new(OwnedTasksInner {
+ list: LinkedList::new(),
+ closed: false,
+ }),
+ id: get_next_id(),
+ _not_send_or_sync: PhantomData,
+ }
+ }
+
+ pub(crate) fn bind<T>(
+ &self,
+ task: T,
+ scheduler: S,
+ id: super::Id,
+ ) -> (JoinHandle<T::Output>, Option<Notified<S>>)
+ where
+ S: Schedule,
+ T: Future + 'static,
+ T::Output: 'static,
+ {
+ let (task, notified, join) = super::new_task(task, scheduler, id);
+
+ unsafe {
+ // safety: We just created the task, so we have exclusive access
+ // to the field.
+ task.header().set_owner_id(self.id);
+ }
+
+ if self.is_closed() {
+ drop(notified);
+ task.shutdown();
+ (join, None)
+ } else {
+ self.with_inner(|inner| {
+ inner.list.push_front(task);
+ });
+ (join, Some(notified))
+ }
+ }
+
+ /// Shuts down all tasks in the collection. This call also closes the
+ /// collection, preventing new items from being added.
+ pub(crate) fn close_and_shutdown_all(&self)
+ where
+ S: Schedule,
+ {
+ self.with_inner(|inner| inner.closed = true);
+
+ while let Some(task) = self.with_inner(|inner| inner.list.pop_back()) {
+ task.shutdown();
+ }
+ }
+
+ pub(crate) fn remove(&self, task: &Task<S>) -> Option<Task<S>> {
+ let task_id = task.header().get_owner_id();
+ if task_id == 0 {
+ // The task is unowned.
+ return None;
+ }
+
+ assert_eq!(task_id, self.id);
+
+ self.with_inner(|inner|
+ // safety: We just checked that the provided task is not in some
+ // other linked list.
+ unsafe { inner.list.remove(task.header_ptr()) })
+ }
+
+ /// Asserts that the given task is owned by this LocalOwnedTasks and convert
+ /// it to a LocalNotified, giving the thread permission to poll this task.
+ #[inline]
+ pub(crate) fn assert_owner(&self, task: Notified<S>) -> LocalNotified<S> {
+ assert_eq!(task.header().get_owner_id(), self.id);
+
+ // safety: The task was bound to this LocalOwnedTasks, and the
+ // LocalOwnedTasks is not Send or Sync, so we are on the right thread
+ // for polling this task.
+ LocalNotified {
+ task: task.0,
+ _not_send: PhantomData,
+ }
+ }
+
+ #[inline]
+ fn with_inner<F, T>(&self, f: F) -> T
+ where
+ F: FnOnce(&mut OwnedTasksInner<S>) -> T,
+ {
+ // safety: This type is not Sync, so concurrent calls of this method
+ // can't happen. Furthermore, all uses of this method in this file make
+ // sure that they don't call `with_inner` recursively.
+ self.inner.with_mut(|ptr| unsafe { f(&mut *ptr) })
+ }
+
+ pub(crate) fn is_closed(&self) -> bool {
+ self.with_inner(|inner| inner.closed)
+ }
+
+ pub(crate) fn is_empty(&self) -> bool {
+ self.with_inner(|inner| inner.list.is_empty())
+ }
+}
+
+#[cfg(all(test))]
+mod tests {
+ use super::*;
+
+ // This test may run in parallel with other tests, so we only test that ids
+ // come in increasing order.
+ #[test]
+ fn test_id_not_broken() {
+ let mut last_id = get_next_id();
+ assert_ne!(last_id, 0);
+
+ for _ in 0..1000 {
+ let next_id = get_next_id();
+ assert_ne!(next_id, 0);
+ assert!(last_id < next_id);
+ last_id = next_id;
+ }
+ }
+}
diff --git a/vendor/tokio/src/runtime/task/mod.rs b/vendor/tokio/src/runtime/task/mod.rs
index 58b8c2a15..932552fb9 100644
--- a/vendor/tokio/src/runtime/task/mod.rs
+++ b/vendor/tokio/src/runtime/task/mod.rs
@@ -1,29 +1,209 @@
+//! The task module.
+//!
+//! The task module contains the code that manages spawned tasks and provides a
+//! safe API for the rest of the runtime to use. Each task in a runtime is
+//! stored in an OwnedTasks or LocalOwnedTasks object.
+//!
+//! # Task reference types
+//!
+//! A task is usually referenced by multiple handles, and there are several
+//! types of handles.
+//!
+//! * OwnedTask - tasks stored in an OwnedTasks or LocalOwnedTasks are of this
+//! reference type.
+//!
+//! * JoinHandle - each task has a JoinHandle that allows access to the output
+//! of the task.
+//!
+//! * Waker - every waker for a task has this reference type. There can be any
+//! number of waker references.
+//!
+//! * Notified - tracks whether the task is notified.
+//!
+//! * Unowned - this task reference type is used for tasks not stored in any
+//! runtime. Mainly used for blocking tasks, but also in tests.
+//!
+//! The task uses a reference count to keep track of how many active references
+//! exist. The Unowned reference type takes up two ref-counts. All other
+//! reference types take up a single ref-count.
+//!
+//! Besides the waker type, each task has at most one of each reference type.
+//!
+//! # State
+//!
+//! The task stores its state in an atomic usize with various bitfields for the
+//! necessary information. The state has the following bitfields:
+//!
+//! * RUNNING - Tracks whether the task is currently being polled or cancelled.
+//! This bit functions as a lock around the task.
+//!
+//! * COMPLETE - Is one once the future has fully completed and has been
+//! dropped. Never unset once set. Never set together with RUNNING.
+//!
+//! * NOTIFIED - Tracks whether a Notified object currently exists.
+//!
+//! * CANCELLED - Is set to one for tasks that should be cancelled as soon as
+//! possible. May take any value for completed tasks.
+//!
+//! * JOIN_INTEREST - Is set to one if there exists a JoinHandle.
+//!
+//! * JOIN_WAKER - Acts as an access control bit for the join handle waker. The
+//! protocol for its usage is described below.
+//!
+//! The rest of the bits are used for the ref-count.
+//!
+//! # Fields in the task
+//!
+//! The task has various fields. This section describes how and when it is safe
+//! to access a field.
+//!
+//! * The state field is accessed with atomic instructions.
+//!
+//! * The OwnedTask reference has exclusive access to the `owned` field.
+//!
+//! * The Notified reference has exclusive access to the `queue_next` field.
+//!
+//! * The `owner_id` field can be set as part of construction of the task, but
+//! is otherwise immutable and anyone can access the field immutably without
+//! synchronization.
+//!
+//! * If COMPLETE is one, then the JoinHandle has exclusive access to the
+//! stage field. If COMPLETE is zero, then the RUNNING bitfield functions as
+//! a lock for the stage field, and it can be accessed only by the thread
+//! that set RUNNING to one.
+//!
+//! * The waker field may be concurrently accessed by different threads: in one
+//! thread the runtime may complete a task and *read* the waker field to
+//! invoke the waker, and in another thread the task's JoinHandle may be
+//! polled, and if the task hasn't yet completed, the JoinHandle may *write*
+//! a waker to the waker field. The JOIN_WAKER bit ensures safe access by
+//! multiple threads to the waker field using the following rules:
+//!
+//! 1. JOIN_WAKER is initialized to zero.
+//!
+//! 2. If JOIN_WAKER is zero, then the JoinHandle has exclusive (mutable)
+//! access to the waker field.
+//!
+//! 3. If JOIN_WAKER is one, then the JoinHandle has shared (read-only)
+//! access to the waker field.
+//!
+//! 4. If JOIN_WAKER is one and COMPLETE is one, then the runtime has shared
+//! (read-only) access to the waker field.
+//!
+//! 5. If the JoinHandle needs to write to the waker field, then the
+//! JoinHandle needs to (i) successfully set JOIN_WAKER to zero if it is
+//! not already zero to gain exclusive access to the waker field per rule
+//! 2, (ii) write a waker, and (iii) successfully set JOIN_WAKER to one.
+//!
+//! 6. The JoinHandle can change JOIN_WAKER only if COMPLETE is zero (i.e.
+//! the task hasn't yet completed).
+//!
+//! Rule 6 implies that the steps (i) or (iii) of rule 5 may fail due to a
+//! race. If step (i) fails, then the attempt to write a waker is aborted. If
+//! step (iii) fails because COMPLETE is set to one by another thread after
+//! step (i), then the waker field is cleared. Once COMPLETE is one (i.e.
+//! task has completed), the JoinHandle will not modify JOIN_WAKER. After the
+//! runtime sets COMPLETE to one, it invokes the waker if there is one.
+//!
+//! All other fields are immutable and can be accessed immutably without
+//! synchronization by anyone.
+//!
+//! # Safety
+//!
+//! This section goes through various situations and explains why the API is
+//! safe in that situation.
+//!
+//! ## Polling or dropping the future
+//!
+//! Any mutable access to the future happens after obtaining a lock by modifying
+//! the RUNNING field, so exclusive access is ensured.
+//!
+//! When the task completes, exclusive access to the output is transferred to
+//! the JoinHandle. If the JoinHandle is already dropped when the transition to
+//! complete happens, the thread performing that transition retains exclusive
+//! access to the output and should immediately drop it.
+//!
+//! ## Non-Send futures
+//!
+//! If a future is not Send, then it is bound to a LocalOwnedTasks. The future
+//! will only ever be polled or dropped given a LocalNotified or inside a call
+//! to LocalOwnedTasks::shutdown_all. In either case, it is guaranteed that the
+//! future is on the right thread.
+//!
+//! If the task is never removed from the LocalOwnedTasks, then it is leaked, so
+//! there is no risk that the task is dropped on some other thread when the last
+//! ref-count drops.
+//!
+//! ## Non-Send output
+//!
+//! When a task completes, the output is placed in the stage of the task. Then,
+//! a transition that sets COMPLETE to true is performed, and the value of
+//! JOIN_INTEREST when this transition happens is read.
+//!
+//! If JOIN_INTEREST is zero when the transition to COMPLETE happens, then the
+//! output is immediately dropped.
+//!
+//! If JOIN_INTEREST is one when the transition to COMPLETE happens, then the
+//! JoinHandle is responsible for cleaning up the output. If the output is not
+//! Send, then this happens:
+//!
+//! 1. The output is created on the thread that the future was polled on. Since
+//! only non-Send futures can have non-Send output, the future was polled on
+//! the thread that the future was spawned from.
+//! 2. Since `JoinHandle<Output>` is not Send if Output is not Send, the
+//! JoinHandle is also on the thread that the future was spawned from.
+//! 3. Thus, the JoinHandle will not move the output across threads when it
+//! takes or drops the output.
+//!
+//! ## Recursive poll/shutdown
+//!
+//! Calling poll from inside a shutdown call or vice-versa is not prevented by
+//! the API exposed by the task module, so this has to be safe. In either case,
+//! the lock in the RUNNING bitfield makes the inner call return immediately. If
+//! the inner call is a `shutdown` call, then the CANCELLED bit is set, and the
+//! poll call will notice it when the poll finishes, and the task is cancelled
+//! at that point.
+
+// Some task infrastructure is here to support `JoinSet`, which is currently
+// unstable. This should be removed once `JoinSet` is stabilized.
+#![cfg_attr(not(tokio_unstable), allow(dead_code))]
+
mod core;
use self::core::Cell;
-pub(crate) use self::core::Header;
+use self::core::Header;
mod error;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::error::JoinError;
mod harness;
use self::harness::Harness;
+mod id;
+#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
+pub use id::{id, try_id, Id};
+
+#[cfg(feature = "rt")]
+mod abort;
mod join;
-#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+
+#[cfg(feature = "rt")]
+pub use self::abort::AbortHandle;
+
pub use self::join::JoinHandle;
+mod list;
+pub(crate) use self::list::{LocalOwnedTasks, OwnedTasks};
+
mod raw;
-use self::raw::RawTask;
+pub(crate) use self::raw::RawTask;
mod state;
use self::state::State;
mod waker;
-cfg_rt_multi_thread! {
- mod stack;
- pub(crate) use self::stack::TransferStack;
+cfg_taskdump! {
+ pub(crate) mod trace;
}
use crate::future::Future;
@@ -33,7 +213,7 @@ use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
-/// An owned handle to the task, tracked by ref count
+/// An owned handle to the task, tracked by ref count.
#[repr(transparent)]
pub(crate) struct Task<S: 'static> {
raw: RawTask,
@@ -43,30 +223,43 @@ pub(crate) struct Task<S: 'static> {
unsafe impl<S> Send for Task<S> {}
unsafe impl<S> Sync for Task<S> {}
-/// A task was notified
+/// A task was notified.
#[repr(transparent)]
pub(crate) struct Notified<S: 'static>(Task<S>);
+// safety: This type cannot be used to touch the task without first verifying
+// that the value is on a thread where it is safe to poll the task.
unsafe impl<S: Schedule> Send for Notified<S> {}
unsafe impl<S: Schedule> Sync for Notified<S> {}
-/// Task result sent back
+/// A non-Send variant of Notified with the invariant that it is on a thread
+/// where it is safe to poll it.
+#[repr(transparent)]
+pub(crate) struct LocalNotified<S: 'static> {
+ task: Task<S>,
+ _not_send: PhantomData<*const ()>,
+}
+
+/// A task that is not owned by any OwnedTasks. Used for blocking tasks.
+/// This type holds two ref-counts.
+pub(crate) struct UnownedTask<S: 'static> {
+ raw: RawTask,
+ _p: PhantomData<S>,
+}
+
+// safety: This type can only be created given a Send task.
+unsafe impl<S> Send for UnownedTask<S> {}
+unsafe impl<S> Sync for UnownedTask<S> {}
+
+/// Task result sent back.
pub(crate) type Result<T> = std::result::Result<T, JoinError>;
pub(crate) trait Schedule: Sync + Sized + 'static {
- /// Bind a task to the executor.
- ///
- /// Guaranteed to be called from the thread that called `poll` on the task.
- /// The returned `Schedule` instance is associated with the task and is used
- /// as `&self` in the other methods on this trait.
- fn bind(task: Task<Self>) -> Self;
-
/// The task has completed work and is ready to be released. The scheduler
- /// is free to drop it whenever.
+ /// should release it immediately and return it. The task module will batch
+ /// the ref-dec with setting other options.
///
- /// If the scheduler can immediately release the task, it should return
- /// it as part of the function. This enables the task module to batch
- /// the ref-dec with other options.
+ /// If the scheduler has already released the task, then None is returned.
fn release(&self, task: &Task<Self>) -> Option<Task<Self>>;
/// Schedule the task
@@ -77,104 +270,177 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}
+
+ /// Polling the task resulted in a panic. Should the runtime shutdown?
+ fn unhandled_panic(&self) {
+ // By default, do nothing. This maintains the 1.0 behavior.
+ }
}
cfg_rt! {
- /// Create a new task with an associated join handle
- pub(crate) fn joinable<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ /// This is the constructor for a new task. Three references to the task are
+ /// created. The first task reference is usually put into an OwnedTasks
+ /// immediately. The Notified is sent to the scheduler as an ordinary
+ /// notification.
+ fn new_task<T, S>(
+ task: T,
+ scheduler: S,
+ id: Id,
+ ) -> (Task<S>, Notified<S>, JoinHandle<T::Output>)
where
- T: Future + Send + 'static,
S: Schedule,
+ T: Future + 'static,
+ T::Output: 'static,
{
- let raw = RawTask::new::<_, S>(task);
-
+ let raw = RawTask::new::<T, S>(task, scheduler, id);
let task = Task {
raw,
_p: PhantomData,
};
-
+ let notified = Notified(Task {
+ raw,
+ _p: PhantomData,
+ });
let join = JoinHandle::new(raw);
- (Notified(task), join)
+ (task, notified, join)
}
-}
-cfg_rt! {
- /// Create a new `!Send` task with an associated join handle
- pub(crate) unsafe fn joinable_local<T, S>(task: T) -> (Notified<S>, JoinHandle<T::Output>)
+ /// Creates a new task with an associated join handle. This method is used
+ /// only when the task is not going to be stored in an `OwnedTasks` list.
+ ///
+ /// Currently only blocking tasks use this method.
+ pub(crate) fn unowned<T, S>(task: T, scheduler: S, id: Id) -> (UnownedTask<S>, JoinHandle<T::Output>)
where
- T: Future + 'static,
S: Schedule,
+ T: Send + Future + 'static,
+ T::Output: Send + 'static,
{
- let raw = RawTask::new::<_, S>(task);
+ let (task, notified, join) = new_task(task, scheduler, id);
- let task = Task {
- raw,
+ // This transfers the ref-count of task and notified into an UnownedTask.
+ // This is valid because an UnownedTask holds two ref-counts.
+ let unowned = UnownedTask {
+ raw: task.raw,
_p: PhantomData,
};
+ std::mem::forget(task);
+ std::mem::forget(notified);
- let join = JoinHandle::new(raw);
-
- (Notified(task), join)
+ (unowned, join)
}
}
impl<S: 'static> Task<S> {
- pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ unsafe fn new(raw: RawTask) -> Task<S> {
Task {
- raw: RawTask::from_raw(ptr),
+ raw,
_p: PhantomData,
}
}
- pub(crate) fn header(&self) -> &Header {
+ unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
+ Task::new(RawTask::from_raw(ptr))
+ }
+
+ #[cfg(all(
+ tokio_unstable,
+ tokio_taskdump,
+ feature = "rt",
+ target_os = "linux",
+ any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
+ ))]
+ pub(super) fn as_raw(&self) -> RawTask {
+ self.raw
+ }
+
+ fn header(&self) -> &Header {
self.raw.header()
}
-}
-cfg_rt_multi_thread! {
- impl<S: 'static> Notified<S> {
- pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Notified<S> {
- Notified(Task::from_raw(ptr))
- }
+ fn header_ptr(&self) -> NonNull<Header> {
+ self.raw.header_ptr()
+ }
+}
- pub(crate) fn header(&self) -> &Header {
- self.0.header()
- }
+impl<S: 'static> Notified<S> {
+ fn header(&self) -> &Header {
+ self.0.header()
}
+}
- impl<S: 'static> Task<S> {
- pub(crate) fn into_raw(self) -> NonNull<Header> {
- let ret = self.header().into();
- mem::forget(self);
- ret
- }
+impl<S: 'static> Notified<S> {
+ pub(crate) unsafe fn from_raw(ptr: RawTask) -> Notified<S> {
+ Notified(Task::new(ptr))
}
+}
- impl<S: 'static> Notified<S> {
- pub(crate) fn into_raw(self) -> NonNull<Header> {
- self.0.into_raw()
- }
+impl<S: 'static> Notified<S> {
+ pub(crate) fn into_raw(self) -> RawTask {
+ let raw = self.0.raw;
+ mem::forget(self);
+ raw
}
}
impl<S: Schedule> Task<S> {
- /// Pre-emptively cancel the task as part of the shutdown process.
- pub(crate) fn shutdown(&self) {
- self.raw.shutdown();
+ /// Preemptively cancels the task as part of the shutdown process.
+ pub(crate) fn shutdown(self) {
+ let raw = self.raw;
+ mem::forget(self);
+ raw.shutdown();
}
}
-impl<S: Schedule> Notified<S> {
- /// Run the task
+impl<S: Schedule> LocalNotified<S> {
+ /// Runs the task.
+ pub(crate) fn run(self) {
+ let raw = self.task.raw;
+ mem::forget(self);
+ raw.poll();
+ }
+}
+
+impl<S: Schedule> UnownedTask<S> {
+ // Used in test of the inject queue.
+ #[cfg(test)]
+ #[cfg_attr(tokio_wasm, allow(dead_code))]
+ pub(super) fn into_notified(self) -> Notified<S> {
+ Notified(self.into_task())
+ }
+
+ fn into_task(self) -> Task<S> {
+ // Convert into a task.
+ let task = Task {
+ raw: self.raw,
+ _p: PhantomData,
+ };
+ mem::forget(self);
+
+ // Drop a ref-count since an UnownedTask holds two.
+ task.header().state.ref_dec();
+
+ task
+ }
+
pub(crate) fn run(self) {
- self.0.raw.poll();
+ let raw = self.raw;
mem::forget(self);
+
+ // Transfer one ref-count to a Task object.
+ let task = Task::<S> {
+ raw,
+ _p: PhantomData,
+ };
+
+ // Use the other ref-count to poll the task.
+ raw.poll();
+ // Decrement our extra ref-count
+ drop(task);
}
- /// Pre-emptively cancel the task as part of the shutdown process.
pub(crate) fn shutdown(self) {
- self.0.shutdown();
+ self.into_task().shutdown()
}
}
@@ -188,6 +454,16 @@ impl<S: 'static> Drop for Task<S> {
}
}
+impl<S: 'static> Drop for UnownedTask<S> {
+ fn drop(&mut self) {
+ // Decrement the ref count
+ if self.raw.header().state.ref_dec_twice() {
+ // Deallocate if this is the final ref count
+ self.raw.dealloc();
+ }
+ }
+}
+
impl<S> fmt::Debug for Task<S> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Task({:p})", self.header())
@@ -202,13 +478,13 @@ impl<S> fmt::Debug for Notified<S> {
/// # Safety
///
-/// Tasks are pinned
+/// Tasks are pinned.
unsafe impl<S> linked_list::Link for Task<S> {
type Handle = Task<S>;
type Target = Header;
fn as_raw(handle: &Task<S>) -> NonNull<Header> {
- handle.header().into()
+ handle.raw.header_ptr()
}
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
@@ -216,7 +492,6 @@ unsafe impl<S> linked_list::Link for Task<S> {
}
unsafe fn pointers(target: NonNull<Header>) -> NonNull<linked_list::Pointers<Header>> {
- // Not super great as it avoids some of looms checking...
- NonNull::from(target.as_ref().owned.with_mut(|ptr| &mut *ptr))
+ self::core::Trailer::addr_of_owned(Header::get_trailer(target))
}
}
diff --git a/vendor/tokio/src/runtime/task/raw.rs b/vendor/tokio/src/runtime/task/raw.rs
index 56d65d5a6..807885928 100644
--- a/vendor/tokio/src/runtime/task/raw.rs
+++ b/vendor/tokio/src/runtime/task/raw.rs
@@ -1,53 +1,167 @@
use crate::future::Future;
-use crate::runtime::task::{Cell, Harness, Header, Schedule, State};
+use crate::runtime::task::core::{Core, Trailer};
+use crate::runtime::task::{Cell, Harness, Header, Id, Schedule, State};
use std::ptr::NonNull;
use std::task::{Poll, Waker};
/// Raw task handle
-pub(super) struct RawTask {
+pub(crate) struct RawTask {
ptr: NonNull<Header>,
}
pub(super) struct Vtable {
- /// Poll the future
+ /// Polls the future.
pub(super) poll: unsafe fn(NonNull<Header>),
- /// Deallocate the memory
+ /// Schedules the task for execution on the runtime.
+ pub(super) schedule: unsafe fn(NonNull<Header>),
+
+ /// Deallocates the memory.
pub(super) dealloc: unsafe fn(NonNull<Header>),
- /// Read the task output, if complete
+ /// Reads the task output, if complete.
pub(super) try_read_output: unsafe fn(NonNull<Header>, *mut (), &Waker),
- /// The join handle has been dropped
+ /// The join handle has been dropped.
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),
- /// The task is remotely aborted
- pub(super) remote_abort: unsafe fn(NonNull<Header>),
+ /// An abort handle has been dropped.
+ pub(super) drop_abort_handle: unsafe fn(NonNull<Header>),
- /// Scheduler is being shutdown
+ /// Scheduler is being shutdown.
pub(super) shutdown: unsafe fn(NonNull<Header>),
+
+ /// The number of bytes that the `trailer` field is offset from the header.
+ pub(super) trailer_offset: usize,
+
+ /// The number of bytes that the `scheduler` field is offset from the header.
+ pub(super) scheduler_offset: usize,
+
+ /// The number of bytes that the `id` field is offset from the header.
+ pub(super) id_offset: usize,
}
/// Get the vtable for the requested `T` and `S` generics.
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
+ schedule: schedule::<S>,
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
- remote_abort: remote_abort::<T, S>,
+ drop_abort_handle: drop_abort_handle::<T, S>,
shutdown: shutdown::<T, S>,
+ trailer_offset: OffsetHelper::<T, S>::TRAILER_OFFSET,
+ scheduler_offset: OffsetHelper::<T, S>::SCHEDULER_OFFSET,
+ id_offset: OffsetHelper::<T, S>::ID_OFFSET,
}
}
+/// Calling `get_trailer_offset` directly in vtable doesn't work because it
+/// prevents the vtable from being promoted to a static reference.
+///
+/// See this thread for more info:
+/// <https://users.rust-lang.org/t/custom-vtables-with-integers/78508>
+struct OffsetHelper<T, S>(T, S);
+impl<T: Future, S: Schedule> OffsetHelper<T, S> {
+ // Pass `size_of`/`align_of` as arguments rather than calling them directly
+ // inside `get_trailer_offset` because trait bounds on generic parameters
+ // of const fn are unstable on our MSRV.
+ const TRAILER_OFFSET: usize = get_trailer_offset(
+ std::mem::size_of::<Header>(),
+ std::mem::size_of::<Core<T, S>>(),
+ std::mem::align_of::<Core<T, S>>(),
+ std::mem::align_of::<Trailer>(),
+ );
+
+ // The `scheduler` is the first field of `Core`, so it has the same
+ // offset as `Core`.
+ const SCHEDULER_OFFSET: usize = get_core_offset(
+ std::mem::size_of::<Header>(),
+ std::mem::align_of::<Core<T, S>>(),
+ );
+
+ const ID_OFFSET: usize = get_id_offset(
+ std::mem::size_of::<Header>(),
+ std::mem::align_of::<Core<T, S>>(),
+ std::mem::size_of::<S>(),
+ std::mem::align_of::<Id>(),
+ );
+}
+
+/// Compute the offset of the `Trailer` field in `Cell<T, S>` using the
+/// `#[repr(C)]` algorithm.
+///
+/// Pseudo-code for the `#[repr(C)]` algorithm can be found here:
+/// <https://doc.rust-lang.org/reference/type-layout.html#reprc-structs>
+const fn get_trailer_offset(
+ header_size: usize,
+ core_size: usize,
+ core_align: usize,
+ trailer_align: usize,
+) -> usize {
+ let mut offset = header_size;
+
+ let core_misalign = offset % core_align;
+ if core_misalign > 0 {
+ offset += core_align - core_misalign;
+ }
+ offset += core_size;
+
+ let trailer_misalign = offset % trailer_align;
+ if trailer_misalign > 0 {
+ offset += trailer_align - trailer_misalign;
+ }
+
+ offset
+}
+
+/// Compute the offset of the `Core<T, S>` field in `Cell<T, S>` using the
+/// `#[repr(C)]` algorithm.
+///
+/// Pseudo-code for the `#[repr(C)]` algorithm can be found here:
+/// <https://doc.rust-lang.org/reference/type-layout.html#reprc-structs>
+const fn get_core_offset(header_size: usize, core_align: usize) -> usize {
+ let mut offset = header_size;
+
+ let core_misalign = offset % core_align;
+ if core_misalign > 0 {
+ offset += core_align - core_misalign;
+ }
+
+ offset
+}
+
+/// Compute the offset of the `Id` field in `Cell<T, S>` using the
+/// `#[repr(C)]` algorithm.
+///
+/// Pseudo-code for the `#[repr(C)]` algorithm can be found here:
+/// <https://doc.rust-lang.org/reference/type-layout.html#reprc-structs>
+const fn get_id_offset(
+ header_size: usize,
+ core_align: usize,
+ scheduler_size: usize,
+ id_align: usize,
+) -> usize {
+ let mut offset = get_core_offset(header_size, core_align);
+ offset += scheduler_size;
+
+ let id_misalign = offset % id_align;
+ if id_misalign > 0 {
+ offset += id_align - id_misalign;
+ }
+
+ offset
+}
+
impl RawTask {
- pub(super) fn new<T, S>(task: T) -> RawTask
+ pub(super) fn new<T, S>(task: T, scheduler: S, id: Id) -> RawTask
where
T: Future,
S: Schedule,
{
- let ptr = Box::into_raw(Cell::<_, S>::new(task, State::new()));
+ let ptr = Box::into_raw(Cell::<_, S>::new(task, scheduler, State::new(), id));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
RawTask { ptr }
@@ -57,19 +171,40 @@ impl RawTask {
RawTask { ptr }
}
- /// Returns a reference to the task's meta structure.
- ///
- /// Safe as `Header` is `Sync`.
+ pub(super) fn header_ptr(&self) -> NonNull<Header> {
+ self.ptr
+ }
+
+ pub(super) fn trailer_ptr(&self) -> NonNull<Trailer> {
+ unsafe { Header::get_trailer(self.ptr) }
+ }
+
+ /// Returns a reference to the task's header.
pub(super) fn header(&self) -> &Header {
unsafe { self.ptr.as_ref() }
}
+ /// Returns a reference to the task's trailer.
+ pub(super) fn trailer(&self) -> &Trailer {
+ unsafe { &*self.trailer_ptr().as_ptr() }
+ }
+
+ /// Returns a reference to the task's state.
+ pub(super) fn state(&self) -> &State {
+ &self.header().state
+ }
+
/// Safety: mutual exclusion is required to call this function.
- pub(super) fn poll(self) {
+ pub(crate) fn poll(self) {
let vtable = self.header().vtable;
unsafe { (vtable.poll)(self.ptr) }
}
+ pub(super) fn schedule(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.schedule)(self.ptr) }
+ }
+
pub(super) fn dealloc(self) {
let vtable = self.header().vtable;
unsafe {
@@ -89,14 +224,42 @@ impl RawTask {
unsafe { (vtable.drop_join_handle_slow)(self.ptr) }
}
+ pub(super) fn drop_abort_handle(self) {
+ let vtable = self.header().vtable;
+ unsafe { (vtable.drop_abort_handle)(self.ptr) }
+ }
+
pub(super) fn shutdown(self) {
let vtable = self.header().vtable;
unsafe { (vtable.shutdown)(self.ptr) }
}
- pub(super) fn remote_abort(self) {
- let vtable = self.header().vtable;
- unsafe { (vtable.remote_abort)(self.ptr) }
+ /// Increment the task's reference count.
+ ///
+ /// Currently, this is used only when creating an `AbortHandle`.
+ pub(super) fn ref_inc(self) {
+ self.header().state.ref_inc();
+ }
+
+ /// Get the queue-next pointer
+ ///
+ /// This is for usage by the injection queue
+ ///
+ /// Safety: make sure only one queue uses this and access is synchronized.
+ pub(crate) unsafe fn get_queue_next(self) -> Option<RawTask> {
+ self.header()
+ .queue_next
+ .with(|ptr| *ptr)
+ .map(|p| RawTask::from_raw(p))
+ }
+
+ /// Sets the queue-next pointer
+ ///
+ /// This is for usage by the injection queue
+ ///
+ /// Safety: make sure only one queue uses this and access is synchronized.
+ pub(crate) unsafe fn set_queue_next(self, val: Option<RawTask>) {
+ self.header().set_next(val.map(|task| task.ptr));
}
}
@@ -113,6 +276,15 @@ unsafe fn poll<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.poll();
}
+unsafe fn schedule<S: Schedule>(ptr: NonNull<Header>) {
+ use crate::runtime::task::{Notified, Task};
+
+ let scheduler = Header::get_scheduler::<S>(ptr);
+ scheduler
+ .as_ref()
+ .schedule(Notified(Task::from_raw(ptr.cast())));
+}
+
unsafe fn dealloc<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.dealloc();
@@ -134,9 +306,9 @@ unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.drop_join_handle_slow()
}
-unsafe fn remote_abort<T: Future, S: Schedule>(ptr: NonNull<Header>) {
+unsafe fn drop_abort_handle<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
- harness.remote_abort()
+ harness.drop_reference();
}
unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) {
diff --git a/vendor/tokio/src/runtime/task/stack.rs b/vendor/tokio/src/runtime/task/stack.rs
deleted file mode 100644
index 9dd8d3f43..000000000
--- a/vendor/tokio/src/runtime/task/stack.rs
+++ /dev/null
@@ -1,83 +0,0 @@
-use crate::loom::sync::atomic::AtomicPtr;
-use crate::runtime::task::{Header, Task};
-
-use std::marker::PhantomData;
-use std::ptr::{self, NonNull};
-use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
-
-/// Concurrent stack of tasks, used to pass ownership of a task from one worker
-/// to another.
-pub(crate) struct TransferStack<T: 'static> {
- head: AtomicPtr<Header>,
- _p: PhantomData<T>,
-}
-
-impl<T: 'static> TransferStack<T> {
- pub(crate) fn new() -> TransferStack<T> {
- TransferStack {
- head: AtomicPtr::new(ptr::null_mut()),
- _p: PhantomData,
- }
- }
-
- pub(crate) fn push(&self, task: Task<T>) {
- let task = task.into_raw();
-
- // We don't care about any memory associated w/ setting the `head`
- // field, just the current value.
- //
- // The compare-exchange creates a release sequence.
- let mut curr = self.head.load(Relaxed);
-
- loop {
- unsafe {
- task.as_ref()
- .stack_next
- .with_mut(|ptr| *ptr = NonNull::new(curr))
- };
-
- let res = self
- .head
- .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed);
-
- match res {
- Ok(_) => return,
- Err(actual) => {
- curr = actual;
- }
- }
- }
- }
-
- pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
- struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>);
-
- impl<T: 'static> Iterator for Iter<T> {
- type Item = Task<T>;
-
- fn next(&mut self) -> Option<Task<T>> {
- let task = self.0?;
-
- // Move the cursor forward
- self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };
-
- // Return the task
- unsafe { Some(Task::from_raw(task)) }
- }
- }
-
- impl<T: 'static> Drop for Iter<T> {
- fn drop(&mut self) {
- use std::process;
-
- if self.0.is_some() {
- // we have bugs
- process::abort();
- }
- }
- }
-
- let ptr = self.head.swap(ptr::null_mut(), Acquire);
- Iter(NonNull::new(ptr), PhantomData)
- }
-}
diff --git a/vendor/tokio/src/runtime/task/state.rs b/vendor/tokio/src/runtime/task/state.rs
index 603772162..12f544918 100644
--- a/vendor/tokio/src/runtime/task/state.rs
+++ b/vendor/tokio/src/runtime/task/state.rs
@@ -8,7 +8,7 @@ pub(super) struct State {
val: AtomicUsize,
}
-/// Current state value
+/// Current state value.
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize);
@@ -19,20 +19,20 @@ const RUNNING: usize = 0b0001;
/// The task is complete.
///
-/// Once this bit is set, it is never unset
+/// Once this bit is set, it is never unset.
const COMPLETE: usize = 0b0010;
-/// Extracts the task's lifecycle value from the state
+/// Extracts the task's lifecycle value from the state.
const LIFECYCLE_MASK: usize = 0b11;
/// Flag tracking if the task has been pushed into a run queue.
const NOTIFIED: usize = 0b100;
-/// The join handle is still around
+/// The join handle is still around.
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_INTEREST: usize = 0b1_000;
-/// A join handle waker has been set
+/// A join handle waker has been set.
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const JOIN_WAKER: usize = 0b10_000;
@@ -40,36 +40,66 @@ const JOIN_WAKER: usize = 0b10_000;
#[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
const CANCELLED: usize = 0b100_000;
-/// All bits
+/// All bits.
const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
/// Bits used by the ref count portion of the state.
const REF_COUNT_MASK: usize = !STATE_MASK;
-/// Number of positions to shift the ref count
+/// Number of positions to shift the ref count.
const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
-/// One ref count
+/// One ref count.
const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
-/// State a task is initialized with
+/// State a task is initialized with.
///
-/// A task is initialized with two references: one for the scheduler and one for
-/// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is
-/// set. A new task is immediately pushed into the run queue for execution and
-/// starts with the `NOTIFIED` flag set.
-const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED;
+/// A task is initialized with three references:
+///
+/// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks.
+/// * A reference that will be sent to the scheduler as an ordinary notification.
+/// * A reference for the JoinHandle.
+///
+/// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
+/// As the task starts with a `Notified`, `NOTIFIED` is set.
+const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
+
+#[must_use]
+pub(super) enum TransitionToRunning {
+ Success,
+ Cancelled,
+ Failed,
+ Dealloc,
+}
+
+#[must_use]
+pub(super) enum TransitionToIdle {
+ Ok,
+ OkNotified,
+ OkDealloc,
+ Cancelled,
+}
+
+#[must_use]
+pub(super) enum TransitionToNotifiedByVal {
+ DoNothing,
+ Submit,
+ Dealloc,
+}
+
+#[must_use]
+pub(crate) enum TransitionToNotifiedByRef {
+ DoNothing,
+ Submit,
+}
/// All transitions are performed via RMW operations. This establishes an
/// unambiguous modification order.
impl State {
- /// Return a task's initial state
+ /// Returns a task's initial state.
pub(super) fn new() -> State {
- // A task is initialized with three references: one for the scheduler,
- // one for the `JoinHandle`, one for the task handle made available in
- // release. As the task starts with a `JoinHandle`, `JOIN_INTEREST` is
- // set. A new task is immediately pushed into the run queue for
- // execution and starts with the `NOTIFIED` flag set.
+ // The raw task returned by this method has a ref-count of three. See
+ // the comment on INITIAL_STATE for more.
State {
val: AtomicUsize::new(INITIAL_STATE),
}
@@ -80,57 +110,72 @@ impl State {
Snapshot(self.val.load(Acquire))
}
- /// Attempt to transition the lifecycle to `Running`.
- ///
- /// If `ref_inc` is set, the reference count is also incremented.
- ///
- /// The `NOTIFIED` bit is always unset.
- pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult {
- self.fetch_update(|curr| {
- assert!(curr.is_notified());
-
- let mut next = curr;
+ /// Attempts to transition the lifecycle to `Running`. This sets the
+ /// notified bit to false so notifications during the poll can be detected.
+ pub(super) fn transition_to_running(&self) -> TransitionToRunning {
+ self.fetch_update_action(|mut next| {
+ let action;
+ assert!(next.is_notified());
if !next.is_idle() {
- return None;
- }
-
- if ref_inc {
- next.ref_inc();
+ // This happens if the task is either currently running or if it
+ // has already completed, e.g. if it was cancelled during
+ // shutdown. Consume the ref-count and return.
+ next.ref_dec();
+ if next.ref_count() == 0 {
+ action = TransitionToRunning::Dealloc;
+ } else {
+ action = TransitionToRunning::Failed;
+ }
+ } else {
+ // We are able to lock the RUNNING bit.
+ next.set_running();
+ next.unset_notified();
+
+ if next.is_cancelled() {
+ action = TransitionToRunning::Cancelled;
+ } else {
+ action = TransitionToRunning::Success;
+ }
}
-
- next.set_running();
- next.unset_notified();
- Some(next)
+ (action, Some(next))
})
}
/// Transitions the task from `Running` -> `Idle`.
///
- /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise.
- /// In both cases, a snapshot of the state from **after** the transition is
- /// returned.
- ///
+ /// Returns `true` if the transition to `Idle` is successful, `false` otherwise.
/// The transition to `Idle` fails if the task has been flagged to be
/// cancelled.
- pub(super) fn transition_to_idle(&self) -> UpdateResult {
- self.fetch_update(|curr| {
+ pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
+ self.fetch_update_action(|curr| {
assert!(curr.is_running());
if curr.is_cancelled() {
- return None;
+ return (TransitionToIdle::Cancelled, None);
}
let mut next = curr;
+ let action;
next.unset_running();
- if next.is_notified() {
- // The caller needs to schedule the task. To do this, it needs a
- // waker. The waker requires a ref count.
+ if !next.is_notified() {
+ // Polling the future consumes the ref-count of the Notified.
+ next.ref_dec();
+ if next.ref_count() == 0 {
+ action = TransitionToIdle::OkDealloc;
+ } else {
+ action = TransitionToIdle::Ok;
+ }
+ } else {
+ // The caller will schedule a new notification, so we create a
+ // new ref-count for the notification. Our own ref-count is kept
+ // for now, and the caller will drop it shortly.
next.ref_inc();
+ action = TransitionToIdle::OkNotified;
}
- Some(next)
+ (action, Some(next))
})
}
@@ -145,51 +190,139 @@ impl State {
Snapshot(prev.0 ^ DELTA)
}
- /// Transition from `Complete` -> `Terminal`, decrementing the reference
- /// count by 1.
+ /// Transitions from `Complete` -> `Terminal`, decrementing the reference
+ /// count the specified number of times.
///
- /// When `ref_dec` is set, an additional ref count decrement is performed.
- /// This is used to batch atomic ops when possible.
- pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot {
- self.fetch_update(|mut snapshot| {
- if complete {
- snapshot.set_complete();
- } else {
- assert!(snapshot.is_complete());
- }
+ /// Returns true if the task should be deallocated.
+ pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
+ assert!(
+ prev.ref_count() >= count,
+ "current: {}, sub: {}",
+ prev.ref_count(),
+ count
+ );
+ prev.ref_count() == count
+ }
+
+ /// Transitions the state to `NOTIFIED`.
+ ///
+ /// If no task needs to be submitted, a ref-count is consumed.
+ ///
+ /// If a task needs to be submitted, the ref-count is incremented for the
+ /// new Notified.
+ pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
+ self.fetch_update_action(|mut snapshot| {
+ let action;
+
+ if snapshot.is_running() {
+ // If the task is running, we mark it as notified, but we should
+ // not submit anything as the thread currently running the
+ // future is responsible for that.
+ snapshot.set_notified();
+ snapshot.ref_dec();
- // Decrement the primary handle
- snapshot.ref_dec();
+ // The thread that set the running bit also holds a ref-count.
+ assert!(snapshot.ref_count() > 0);
- if ref_dec {
- // Decrement a second time
+ action = TransitionToNotifiedByVal::DoNothing;
+ } else if snapshot.is_complete() || snapshot.is_notified() {
+ // We do not need to submit any notifications, but we have to
+ // decrement the ref-count.
snapshot.ref_dec();
+
+ if snapshot.ref_count() == 0 {
+ action = TransitionToNotifiedByVal::Dealloc;
+ } else {
+ action = TransitionToNotifiedByVal::DoNothing;
+ }
+ } else {
+ // We create a new notified that we can submit. The caller
+ // retains ownership of the ref-count they passed in.
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ action = TransitionToNotifiedByVal::Submit;
}
- Some(snapshot)
+ (action, Some(snapshot))
})
- .unwrap()
}
/// Transitions the state to `NOTIFIED`.
- ///
- /// Returns `true` if the task needs to be submitted to the pool for
- /// execution
- pub(super) fn transition_to_notified(&self) -> bool {
- let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel));
- prev.will_need_queueing()
+ pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
+ self.fetch_update_action(|mut snapshot| {
+ if snapshot.is_complete() || snapshot.is_notified() {
+ // There is nothing to do in this case.
+ (TransitionToNotifiedByRef::DoNothing, None)
+ } else if snapshot.is_running() {
+ // If the task is running, we mark it as notified, but we should
+ // not submit as the thread currently running the future is
+ // responsible for that.
+ snapshot.set_notified();
+ (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
+ } else {
+ // The task is idle and not notified. We should submit a
+ // notification.
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ (TransitionToNotifiedByRef::Submit, Some(snapshot))
+ }
+ })
}
- /// Set the cancelled bit and transition the state to `NOTIFIED`.
+ /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
+ #[cfg(all(
+ tokio_unstable,
+ tokio_taskdump,
+ feature = "rt",
+ target_os = "linux",
+ any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
+ ))]
+ pub(super) fn transition_to_notified_for_tracing(&self) {
+ self.fetch_update_action(|mut snapshot| {
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ ((), Some(snapshot))
+ });
+ }
+
+ /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
///
/// Returns `true` if the task needs to be submitted to the pool for
- /// execution
+ /// execution.
pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
- let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel));
- prev.will_need_queueing()
+ self.fetch_update_action(|mut snapshot| {
+ if snapshot.is_cancelled() || snapshot.is_complete() {
+ // Aborts to completed or cancelled tasks are no-ops.
+ (false, None)
+ } else if snapshot.is_running() {
+ // If the task is running, we mark it as cancelled. The thread
+ // running the task will notice the cancelled bit when it
+ // stops polling and it will kill the task.
+ //
+ // The set_notified() call is not strictly necessary but it will
+ // in some cases let a wake_by_ref call return without having
+ // to perform a compare_exchange.
+ snapshot.set_notified();
+ snapshot.set_cancelled();
+ (false, Some(snapshot))
+ } else {
+ // The task is idle. We set the cancelled and notified bits and
+ // submit a notification if the notified bit was not already
+ // set.
+ snapshot.set_cancelled();
+ if !snapshot.is_notified() {
+ snapshot.set_notified();
+ snapshot.ref_inc();
+ (true, Some(snapshot))
+ } else {
+ (false, Some(snapshot))
+ }
+ }
+ })
}
- /// Set the `CANCELLED` bit and attempt to transition to `Running`.
+ /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
///
/// Returns `true` if the transition to `Running` succeeded.
pub(super) fn transition_to_shutdown(&self) -> bool {
@@ -200,17 +333,11 @@ impl State {
if snapshot.is_idle() {
snapshot.set_running();
-
- if snapshot.is_notified() {
- // If the task is idle and notified, this indicates the task is
- // in the run queue and is considered owned by the scheduler.
- // The shutdown operation claims ownership of the task, which
- // means we need to assign an additional ref-count to the task
- // in the queue.
- snapshot.ref_inc();
- }
}
+ // If the task was not idle, the thread currently running the task
+ // will notice the cancelled bit and cancel it once the poll
+ // completes.
snapshot.set_cancelled();
Some(snapshot)
});
@@ -219,7 +346,7 @@ impl State {
}
/// Optimistically tries to swap the state assuming the join handle is
- /// __immediately__ dropped on spawn
+ /// __immediately__ dropped on spawn.
pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
use std::sync::atomic::Ordering::Relaxed;
@@ -241,7 +368,7 @@ impl State {
.map_err(|_| ())
}
- /// Try to unset the JOIN_INTEREST flag.
+ /// Tries to unset the JOIN_INTEREST flag.
///
/// Returns `Ok` if the operation happens before the task transitions to a
/// completed state, `Err` otherwise.
@@ -260,14 +387,14 @@ impl State {
})
}
- /// Set the `JOIN_WAKER` bit.
+ /// Sets the `JOIN_WAKER` bit.
///
/// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
/// the task has completed.
pub(super) fn set_join_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(!curr.has_join_waker());
+ assert!(!curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -287,7 +414,7 @@ impl State {
pub(super) fn unset_waker(&self) -> UpdateResult {
self.fetch_update(|curr| {
assert!(curr.is_join_interested());
- assert!(curr.has_join_waker());
+ assert!(curr.is_join_waker_set());
if curr.is_complete() {
return None;
@@ -326,9 +453,39 @@ impl State {
/// Returns `true` if the task should be released.
pub(super) fn ref_dec(&self) -> bool {
let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
+ assert!(prev.ref_count() >= 1);
prev.ref_count() == 1
}
+ /// Returns `true` if the task should be released.
+ pub(super) fn ref_dec_twice(&self) -> bool {
+ let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
+ assert!(prev.ref_count() >= 2);
+ prev.ref_count() == 2
+ }
+
+ fn fetch_update_action<F, T>(&self, mut f: F) -> T
+ where
+ F: FnMut(Snapshot) -> (T, Option<Snapshot>),
+ {
+ let mut curr = self.load();
+
+ loop {
+ let (output, next) = f(curr);
+ let next = match next {
+ Some(next) => next,
+ None => return output,
+ };
+
+ let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
+
+ match res {
+ Ok(_) => return output,
+ Err(actual) => curr = Snapshot(actual),
+ }
+ }
+ }
+
fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
where
F: FnMut(Snapshot) -> Option<Snapshot>,
@@ -368,6 +525,10 @@ impl Snapshot {
self.0 &= !NOTIFIED
}
+ fn set_notified(&mut self) {
+ self.0 |= NOTIFIED
+ }
+
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
@@ -388,10 +549,6 @@ impl Snapshot {
self.0 |= CANCELLED;
}
- fn set_complete(&mut self) {
- self.0 |= COMPLETE;
- }
-
/// Returns `true` if the task's future has completed execution.
pub(super) fn is_complete(self) -> bool {
self.0 & COMPLETE == COMPLETE
@@ -405,7 +562,7 @@ impl Snapshot {
self.0 &= !JOIN_INTEREST
}
- pub(super) fn has_join_waker(self) -> bool {
+ pub(super) fn is_join_waker_set(self) -> bool {
self.0 & JOIN_WAKER == JOIN_WAKER
}
@@ -430,10 +587,6 @@ impl Snapshot {
assert!(self.ref_count() > 0);
self.0 -= REF_ONE
}
-
- fn will_need_queueing(self) -> bool {
- !self.is_notified() && self.is_idle()
- }
}
impl fmt::Debug for State {
@@ -451,7 +604,7 @@ impl fmt::Debug for Snapshot {
.field("is_notified", &self.is_notified())
.field("is_cancelled", &self.is_cancelled())
.field("is_join_interested", &self.is_join_interested())
- .field("has_join_waker", &self.has_join_waker())
+ .field("is_join_waker_set", &self.is_join_waker_set())
.field("ref_count", &self.ref_count())
.finish()
}
diff --git a/vendor/tokio/src/runtime/task/trace/mod.rs b/vendor/tokio/src/runtime/task/trace/mod.rs
new file mode 100644
index 000000000..543b7eee9
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/trace/mod.rs
@@ -0,0 +1,330 @@
+use crate::loom::sync::Arc;
+use crate::runtime::context;
+use crate::runtime::scheduler::{self, current_thread, Inject};
+
+use backtrace::BacktraceFrame;
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::ffi::c_void;
+use std::fmt;
+use std::future::Future;
+use std::pin::Pin;
+use std::ptr::{self, NonNull};
+use std::task::{self, Poll};
+
+mod symbol;
+mod tree;
+
+use symbol::Symbol;
+use tree::Tree;
+
+use super::{Notified, OwnedTasks};
+
+type Backtrace = Vec<BacktraceFrame>;
+type SymbolTrace = Vec<Symbol>;
+
+/// The ambiant backtracing context.
+pub(crate) struct Context {
+ /// The address of [`Trace::root`] establishes an upper unwinding bound on
+ /// the backtraces in `Trace`.
+ active_frame: Cell<Option<NonNull<Frame>>>,
+ /// The place to stash backtraces.
+ collector: Cell<Option<Trace>>,
+}
+
+/// A [`Frame`] in an intrusive, doubly-linked tree of [`Frame`]s.
+struct Frame {
+ /// The location associated with this frame.
+ inner_addr: *const c_void,
+
+ /// The parent frame, if any.
+ parent: Option<NonNull<Frame>>,
+}
+
+/// An tree execution trace.
+///
+/// Traces are captured with [`Trace::capture`], rooted with [`Trace::root`]
+/// and leaved with [`trace_leaf`].
+#[derive(Clone, Debug)]
+pub(crate) struct Trace {
+ // The linear backtraces that comprise this trace. These linear traces can
+ // be re-knitted into a tree.
+ backtraces: Vec<Backtrace>,
+}
+
+pin_project_lite::pin_project! {
+ #[derive(Debug, Clone)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub(crate) struct Root<T> {
+ #[pin]
+ future: T,
+ }
+}
+
+const FAIL_NO_THREAD_LOCAL: &str = "The Tokio thread-local has been destroyed \
+ as part of shutting down the current \
+ thread, so collecting a taskdump is not \
+ possible.";
+
+impl Context {
+ pub(crate) const fn new() -> Self {
+ Context {
+ active_frame: Cell::new(None),
+ collector: Cell::new(None),
+ }
+ }
+
+ /// SAFETY: Callers of this function must ensure that trace frames always
+ /// form a valid linked list.
+ unsafe fn try_with_current<F, R>(f: F) -> Option<R>
+ where
+ F: FnOnce(&Self) -> R,
+ {
+ crate::runtime::context::with_trace(f)
+ }
+
+ unsafe fn with_current_frame<F, R>(f: F) -> R
+ where
+ F: FnOnce(&Cell<Option<NonNull<Frame>>>) -> R,
+ {
+ Self::try_with_current(|context| f(&context.active_frame)).expect(FAIL_NO_THREAD_LOCAL)
+ }
+
+ fn with_current_collector<F, R>(f: F) -> R
+ where
+ F: FnOnce(&Cell<Option<Trace>>) -> R,
+ {
+ // SAFETY: This call can only access the collector field, so it cannot
+ // break the trace frame linked list.
+ unsafe {
+ Self::try_with_current(|context| f(&context.collector)).expect(FAIL_NO_THREAD_LOCAL)
+ }
+ }
+}
+
+impl Trace {
+ /// Invokes `f`, returning both its result and the collection of backtraces
+ /// captured at each sub-invocation of [`trace_leaf`].
+ #[inline(never)]
+ pub(crate) fn capture<F, R>(f: F) -> (R, Trace)
+ where
+ F: FnOnce() -> R,
+ {
+ let collector = Trace { backtraces: vec![] };
+
+ let previous = Context::with_current_collector(|current| current.replace(Some(collector)));
+
+ let result = f();
+
+ let collector =
+ Context::with_current_collector(|current| current.replace(previous)).unwrap();
+
+ (result, collector)
+ }
+
+ /// The root of a trace.
+ #[inline(never)]
+ pub(crate) fn root<F>(future: F) -> Root<F> {
+ Root { future }
+ }
+}
+
+/// If this is a sub-invocation of [`Trace::capture`], capture a backtrace.
+///
+/// The captured backtrace will be returned by [`Trace::capture`].
+///
+/// Invoking this function does nothing when it is not a sub-invocation
+/// [`Trace::capture`].
+// This function is marked `#[inline(never)]` to ensure that it gets a distinct `Frame` in the
+// backtrace, below which frames should not be included in the backtrace (since they reflect the
+// internal implementation details of this crate).
+#[inline(never)]
+pub(crate) fn trace_leaf(cx: &mut task::Context<'_>) -> Poll<()> {
+ // Safety: We don't manipulate the current context's active frame.
+ let did_trace = unsafe {
+ Context::try_with_current(|context_cell| {
+ if let Some(mut collector) = context_cell.collector.take() {
+ let mut frames = vec![];
+ let mut above_leaf = false;
+
+ if let Some(active_frame) = context_cell.active_frame.get() {
+ let active_frame = active_frame.as_ref();
+
+ backtrace::trace(|frame| {
+ let below_root = !ptr::eq(frame.symbol_address(), active_frame.inner_addr);
+
+ // only capture frames above `Trace::leaf` and below
+ // `Trace::root`.
+ if above_leaf && below_root {
+ frames.push(frame.to_owned().into());
+ }
+
+ if ptr::eq(frame.symbol_address(), trace_leaf as *const _) {
+ above_leaf = true;
+ }
+
+ // only continue unwinding if we're below `Trace::root`
+ below_root
+ });
+ }
+ collector.backtraces.push(frames);
+ context_cell.collector.set(Some(collector));
+ true
+ } else {
+ false
+ }
+ })
+ .unwrap_or(false)
+ };
+
+ if did_trace {
+ // Use the same logic that `yield_now` uses to send out wakeups after
+ // the task yields.
+ context::with_scheduler(|scheduler| {
+ if let Some(scheduler) = scheduler {
+ match scheduler {
+ scheduler::Context::CurrentThread(s) => s.defer.defer(cx.waker()),
+ #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
+ scheduler::Context::MultiThread(s) => s.defer.defer(cx.waker()),
+ }
+ }
+ });
+
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+}
+
+impl fmt::Display for Trace {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ Tree::from_trace(self.clone()).fmt(f)
+ }
+}
+
+fn defer<F: FnOnce() -> R, R>(f: F) -> impl Drop {
+ use std::mem::ManuallyDrop;
+
+ struct Defer<F: FnOnce() -> R, R>(ManuallyDrop<F>);
+
+ impl<F: FnOnce() -> R, R> Drop for Defer<F, R> {
+ #[inline(always)]
+ fn drop(&mut self) {
+ unsafe {
+ ManuallyDrop::take(&mut self.0)();
+ }
+ }
+ }
+
+ Defer(ManuallyDrop::new(f))
+}
+
+impl<T: Future> Future for Root<T> {
+ type Output = T::Output;
+
+ #[inline(never)]
+ fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
+ // SAFETY: The context's current frame is restored to its original state
+ // before `frame` is dropped.
+ unsafe {
+ let mut frame = Frame {
+ inner_addr: Self::poll as *const c_void,
+ parent: None,
+ };
+
+ Context::with_current_frame(|current| {
+ frame.parent = current.take();
+ current.set(Some(NonNull::from(&frame)));
+ });
+
+ let _restore = defer(|| {
+ Context::with_current_frame(|current| {
+ current.set(frame.parent);
+ });
+ });
+
+ let this = self.project();
+ this.future.poll(cx)
+ }
+ }
+}
+
+/// Trace and poll all tasks of the current_thread runtime.
+pub(in crate::runtime) fn trace_current_thread(
+ owned: &OwnedTasks<Arc<current_thread::Handle>>,
+ local: &mut VecDeque<Notified<Arc<current_thread::Handle>>>,
+ injection: &Inject<Arc<current_thread::Handle>>,
+) -> Vec<Trace> {
+ // clear the local and injection queues
+ local.clear();
+
+ while let Some(task) = injection.pop() {
+ drop(task);
+ }
+
+ // notify each task
+ let mut tasks = vec![];
+ owned.for_each(|task| {
+ // set the notified bit
+ task.as_raw().state().transition_to_notified_for_tracing();
+ // store the raw tasks into a vec
+ tasks.push(task.as_raw());
+ });
+
+ tasks
+ .into_iter()
+ .map(|task| {
+ let ((), trace) = Trace::capture(|| task.poll());
+ trace
+ })
+ .collect()
+}
+
+cfg_rt_multi_thread! {
+ use crate::loom::sync::Mutex;
+ use crate::runtime::scheduler::multi_thread;
+ use crate::runtime::scheduler::multi_thread::Synced;
+ use crate::runtime::scheduler::inject::Shared;
+
+ /// Trace and poll all tasks of the current_thread runtime.
+ ///
+ /// ## Safety
+ ///
+ /// Must be called with the same `synced` that `injection` was created with.
+ pub(in crate::runtime) unsafe fn trace_multi_thread(
+ owned: &OwnedTasks<Arc<multi_thread::Handle>>,
+ local: &mut multi_thread::queue::Local<Arc<multi_thread::Handle>>,
+ synced: &Mutex<Synced>,
+ injection: &Shared<Arc<multi_thread::Handle>>,
+ ) -> Vec<Trace> {
+ // clear the local queue
+ while let Some(notified) = local.pop() {
+ drop(notified);
+ }
+
+ // clear the injection queue
+ let mut synced = synced.lock();
+ while let Some(notified) = injection.pop(&mut synced.inject) {
+ drop(notified);
+ }
+
+ drop(synced);
+
+ // notify each task
+ let mut traces = vec![];
+ owned.for_each(|task| {
+ // set the notified bit
+ task.as_raw().state().transition_to_notified_for_tracing();
+
+ // trace the task
+ let ((), trace) = Trace::capture(|| task.as_raw().poll());
+ traces.push(trace);
+
+ // reschedule the task
+ let _ = task.as_raw().state().transition_to_notified_by_ref();
+ task.as_raw().schedule();
+ });
+
+ traces
+ }
+}
diff --git a/vendor/tokio/src/runtime/task/trace/symbol.rs b/vendor/tokio/src/runtime/task/trace/symbol.rs
new file mode 100644
index 000000000..49d7ba37f
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/trace/symbol.rs
@@ -0,0 +1,92 @@
+use backtrace::BacktraceSymbol;
+use std::fmt;
+use std::hash::{Hash, Hasher};
+use std::ptr;
+
+/// A symbol in a backtrace.
+///
+/// This wrapper type serves two purposes. The first is that it provides a
+/// representation of a symbol that can be inserted into hashmaps and hashsets;
+/// the [`backtrace`] crate does not define [`Hash`], [`PartialEq`], or [`Eq`]
+/// on [`BacktraceSymbol`], and recommends that users define their own wrapper
+/// which implements these traits.
+///
+/// Second, this wrapper includes a `parent_hash` field that uniquely
+/// identifies this symbol's position in its trace. Otherwise, e.g., our code
+/// would not be able to distinguish between recursive calls of a function at
+/// different depths.
+#[derive(Clone)]
+pub(super) struct Symbol {
+ pub(super) symbol: BacktraceSymbol,
+ pub(super) parent_hash: u64,
+}
+
+impl Hash for Symbol {
+ fn hash<H>(&self, state: &mut H)
+ where
+ H: Hasher,
+ {
+ if let Some(name) = self.symbol.name() {
+ name.as_bytes().hash(state);
+ }
+
+ if let Some(addr) = self.symbol.addr() {
+ ptr::hash(addr, state);
+ }
+
+ self.symbol.filename().hash(state);
+ self.symbol.lineno().hash(state);
+ self.symbol.colno().hash(state);
+ self.parent_hash.hash(state);
+ }
+}
+
+impl PartialEq for Symbol {
+ fn eq(&self, other: &Self) -> bool {
+ (self.parent_hash == other.parent_hash)
+ && match (self.symbol.name(), other.symbol.name()) {
+ (None, None) => true,
+ (Some(lhs_name), Some(rhs_name)) => lhs_name.as_bytes() == rhs_name.as_bytes(),
+ _ => false,
+ }
+ && match (self.symbol.addr(), other.symbol.addr()) {
+ (None, None) => true,
+ (Some(lhs_addr), Some(rhs_addr)) => ptr::eq(lhs_addr, rhs_addr),
+ _ => false,
+ }
+ && (self.symbol.filename() == other.symbol.filename())
+ && (self.symbol.lineno() == other.symbol.lineno())
+ && (self.symbol.colno() == other.symbol.colno())
+ }
+}
+
+impl Eq for Symbol {}
+
+impl fmt::Display for Symbol {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ if let Some(name) = self.symbol.name() {
+ let name = name.to_string();
+ let name = if let Some((name, _)) = name.rsplit_once("::") {
+ name
+ } else {
+ &name
+ };
+ fmt::Display::fmt(&name, f)?;
+ }
+
+ if let Some(filename) = self.symbol.filename() {
+ f.write_str(" at ")?;
+ filename.to_string_lossy().fmt(f)?;
+ if let Some(lineno) = self.symbol.lineno() {
+ f.write_str(":")?;
+ fmt::Display::fmt(&lineno, f)?;
+ if let Some(colno) = self.symbol.colno() {
+ f.write_str(":")?;
+ fmt::Display::fmt(&colno, f)?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/vendor/tokio/src/runtime/task/trace/tree.rs b/vendor/tokio/src/runtime/task/trace/tree.rs
new file mode 100644
index 000000000..7e6f8efec
--- /dev/null
+++ b/vendor/tokio/src/runtime/task/trace/tree.rs
@@ -0,0 +1,126 @@
+use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
+use std::fmt;
+use std::hash::{Hash, Hasher};
+
+use super::{Backtrace, Symbol, SymbolTrace, Trace};
+
+/// An adjacency list representation of an execution tree.
+///
+/// This tree provides a convenient intermediate representation for formatting
+/// [`Trace`] as a tree.
+pub(super) struct Tree {
+ /// The roots of the trees.
+ ///
+ /// There should only be one root, but the code is robust to multiple roots.
+ roots: HashSet<Symbol>,
+
+ /// The adjacency list of symbols in the execution tree(s).
+ edges: HashMap<Symbol, HashSet<Symbol>>,
+}
+
+impl Tree {
+ /// Constructs a [`Tree`] from [`Trace`]
+ pub(super) fn from_trace(trace: Trace) -> Self {
+ let mut roots: HashSet<Symbol> = HashSet::default();
+ let mut edges: HashMap<Symbol, HashSet<Symbol>> = HashMap::default();
+
+ for trace in trace.backtraces {
+ let trace = to_symboltrace(trace);
+
+ if let Some(first) = trace.first() {
+ roots.insert(first.to_owned());
+ }
+
+ let mut trace = trace.into_iter().peekable();
+ while let Some(frame) = trace.next() {
+ let subframes = edges.entry(frame).or_default();
+ if let Some(subframe) = trace.peek() {
+ subframes.insert(subframe.clone());
+ }
+ }
+ }
+
+ Tree { roots, edges }
+ }
+
+ /// Produces the sub-symbols of a given symbol.
+ fn consequences(&self, frame: &Symbol) -> Option<impl ExactSizeIterator<Item = &Symbol>> {
+ Some(self.edges.get(frame)?.iter())
+ }
+
+ /// Format this [`Tree`] as a textual tree.
+ fn display<W: fmt::Write>(
+ &self,
+ f: &mut W,
+ root: &Symbol,
+ is_last: bool,
+ prefix: &str,
+ ) -> fmt::Result {
+ let root_fmt = format!("{}", root);
+
+ let current;
+ let next;
+
+ if is_last {
+ current = format!("{prefix}└╼\u{a0}{root_fmt}");
+ next = format!("{}\u{a0}\u{a0}\u{a0}", prefix);
+ } else {
+ current = format!("{prefix}├╼\u{a0}{root_fmt}");
+ next = format!("{}│\u{a0}\u{a0}", prefix);
+ }
+
+ write!(f, "{}", {
+ let mut current = current.chars();
+ current.next().unwrap();
+ current.next().unwrap();
+ &current.as_str()
+ })?;
+
+ if let Some(consequences) = self.consequences(root) {
+ let len = consequences.len();
+ for (i, consequence) in consequences.enumerate() {
+ let is_last = i == len - 1;
+ writeln!(f)?;
+ self.display(f, consequence, is_last, &next)?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+impl fmt::Display for Tree {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ for root in &self.roots {
+ self.display(f, root, true, " ")?;
+ }
+ Ok(())
+ }
+}
+
+/// Resolve a sequence of [`backtrace::BacktraceFrame`]s into a sequence of
+/// [`Symbol`]s.
+fn to_symboltrace(backtrace: Backtrace) -> SymbolTrace {
+ // Resolve the backtrace frames to symbols.
+ let backtrace: Backtrace = {
+ let mut backtrace = backtrace::Backtrace::from(backtrace);
+ backtrace.resolve();
+ backtrace.into()
+ };
+
+ // Accumulate the symbols in descending order into `symboltrace`.
+ let mut symboltrace: SymbolTrace = vec![];
+ let mut state = DefaultHasher::new();
+ for frame in backtrace.into_iter().rev() {
+ for symbol in frame.symbols().iter().rev() {
+ let symbol = Symbol {
+ symbol: symbol.clone(),
+ parent_hash: state.finish(),
+ };
+ symbol.hash(&mut state);
+ symboltrace.push(symbol);
+ }
+ }
+
+ symboltrace
+}
diff --git a/vendor/tokio/src/runtime/task/waker.rs b/vendor/tokio/src/runtime/task/waker.rs
index b7313b4c5..b5f5ace9e 100644
--- a/vendor/tokio/src/runtime/task/waker.rs
+++ b/vendor/tokio/src/runtime/task/waker.rs
@@ -1,6 +1,5 @@
use crate::future::Future;
-use crate::runtime::task::harness::Harness;
-use crate::runtime::task::{Header, Schedule};
+use crate::runtime::task::{Header, RawTask, Schedule};
use std::marker::PhantomData;
use std::mem::ManuallyDrop;
@@ -13,9 +12,9 @@ pub(super) struct WakerRef<'a, S: 'static> {
_p: PhantomData<(&'a Header, S)>,
}
-/// Returns a `WakerRef` which avoids having to pre-emptively increase the
+/// Returns a `WakerRef` which avoids having to preemptively increase the
/// refcount if there is no need to do so.
-pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
+pub(super) fn waker_ref<T, S>(header: &NonNull<Header>) -> WakerRef<'_, S>
where
T: Future,
S: Schedule,
@@ -28,7 +27,7 @@ where
// point and not an *owned* waker, we must ensure that `drop` is never
// called on this waker instance. This is done by wrapping it with
// `ManuallyDrop` and then never calling drop.
- let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
+ let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker(*header))) };
WakerRef {
waker,
@@ -46,8 +45,8 @@ impl<S> ops::Deref for WakerRef<'_, S> {
cfg_trace! {
macro_rules! trace {
- ($harness:expr, $op:expr) => {
- if let Some(id) = $harness.id() {
+ ($header:expr, $op:expr) => {
+ if let Some(id) = Header::get_tracing_id(&$header) {
tracing::trace!(
target: "tokio::task::waker",
op = $op,
@@ -60,71 +59,46 @@ cfg_trace! {
cfg_not_trace! {
macro_rules! trace {
- ($harness:expr, $op:expr) => {
+ ($header:expr, $op:expr) => {
// noop
- let _ = &$harness;
+ let _ = &$header;
}
}
}
-unsafe fn clone_waker<T, S>(ptr: *const ()) -> RawWaker
-where
- T: Future,
- S: Schedule,
-{
- let header = ptr as *const Header;
- let ptr = NonNull::new_unchecked(ptr as *mut Header);
- let harness = Harness::<T, S>::from_raw(ptr);
- trace!(harness, "waker.clone");
- (*header).state.ref_inc();
- raw_waker::<T, S>(header)
+unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
+ let header = NonNull::new_unchecked(ptr as *mut Header);
+ trace!(header, "waker.clone");
+ header.as_ref().state.ref_inc();
+ raw_waker(header)
}
-unsafe fn drop_waker<T, S>(ptr: *const ())
-where
- T: Future,
- S: Schedule,
-{
+unsafe fn drop_waker(ptr: *const ()) {
let ptr = NonNull::new_unchecked(ptr as *mut Header);
- let harness = Harness::<T, S>::from_raw(ptr);
- trace!(harness, "waker.drop");
- harness.drop_reference();
+ trace!(ptr, "waker.drop");
+ let raw = RawTask::from_raw(ptr);
+ raw.drop_reference();
}
-unsafe fn wake_by_val<T, S>(ptr: *const ())
-where
- T: Future,
- S: Schedule,
-{
+unsafe fn wake_by_val(ptr: *const ()) {
let ptr = NonNull::new_unchecked(ptr as *mut Header);
- let harness = Harness::<T, S>::from_raw(ptr);
- trace!(harness, "waker.wake");
- harness.wake_by_val();
+ trace!(ptr, "waker.wake");
+ let raw = RawTask::from_raw(ptr);
+ raw.wake_by_val();
}
// Wake without consuming the waker
-unsafe fn wake_by_ref<T, S>(ptr: *const ())
-where
- T: Future,
- S: Schedule,
-{
+unsafe fn wake_by_ref(ptr: *const ()) {
let ptr = NonNull::new_unchecked(ptr as *mut Header);
- let harness = Harness::<T, S>::from_raw(ptr);
- trace!(harness, "waker.wake_by_ref");
- harness.wake_by_ref();
+ trace!(ptr, "waker.wake_by_ref");
+ let raw = RawTask::from_raw(ptr);
+ raw.wake_by_ref();
}
-fn raw_waker<T, S>(header: *const Header) -> RawWaker
-where
- T: Future,
- S: Schedule,
-{
- let ptr = header as *const ();
- let vtable = &RawWakerVTable::new(
- clone_waker::<T, S>,
- wake_by_val::<T, S>,
- wake_by_ref::<T, S>,
- drop_waker::<T, S>,
- );
- RawWaker::new(ptr, vtable)
+static WAKER_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(clone_waker, wake_by_val, wake_by_ref, drop_waker);
+
+fn raw_waker(header: NonNull<Header>) -> RawWaker {
+ let ptr = header.as_ptr() as *const ();
+ RawWaker::new(ptr, &WAKER_VTABLE)
}