summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/runtime/tests/task.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/src/runtime/tests/task.rs
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/runtime/tests/task.rs')
-rw-r--r--vendor/tokio/src/runtime/tests/task.rs269
1 files changed, 221 insertions, 48 deletions
diff --git a/vendor/tokio/src/runtime/tests/task.rs b/vendor/tokio/src/runtime/tests/task.rs
index 7c2012523..a79c0f50d 100644
--- a/vendor/tokio/src/runtime/tests/task.rs
+++ b/vendor/tokio/src/runtime/tests/task.rs
@@ -1,44 +1,235 @@
-use crate::runtime::task::{self, Schedule, Task};
-use crate::util::linked_list::{Link, LinkedList};
+use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
+use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;
use std::collections::VecDeque;
+use std::future::Future;
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
+struct AssertDropHandle {
+ is_dropped: Arc<AtomicBool>,
+}
+impl AssertDropHandle {
+ #[track_caller]
+ fn assert_dropped(&self) {
+ assert!(self.is_dropped.load(Ordering::SeqCst));
+ }
+
+ #[track_caller]
+ fn assert_not_dropped(&self) {
+ assert!(!self.is_dropped.load(Ordering::SeqCst));
+ }
+}
+
+struct AssertDrop {
+ is_dropped: Arc<AtomicBool>,
+}
+impl AssertDrop {
+ fn new() -> (Self, AssertDropHandle) {
+ let shared = Arc::new(AtomicBool::new(false));
+ (
+ AssertDrop {
+ is_dropped: shared.clone(),
+ },
+ AssertDropHandle {
+ is_dropped: shared.clone(),
+ },
+ )
+ }
+}
+impl Drop for AssertDrop {
+ fn drop(&mut self) {
+ self.is_dropped.store(true, Ordering::SeqCst);
+ }
+}
+
+// A Notified does not shut down on drop, but it is dropped once the ref-count
+// hits zero.
+#[test]
+fn create_drop1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(join);
+ handle.assert_dropped();
+}
+
#[test]
-fn create_drop() {
- let _ = super::joinable::<_, Runtime>(async { unreachable!() });
+fn create_drop2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(join);
+ handle.assert_not_dropped();
+ drop(notified);
+ handle.assert_dropped();
+}
+
+#[test]
+fn drop_abort_handle1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(join);
+ handle.assert_not_dropped();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_dropped();
+}
+
+#[test]
+fn drop_abort_handle2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ let abort = join.abort_handle();
+ drop(notified);
+ handle.assert_not_dropped();
+ drop(abort);
+ handle.assert_not_dropped();
+ drop(join);
+ handle.assert_dropped();
+}
+
+// Shutting down through Notified works
+#[test]
+fn create_shutdown1() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ drop(join);
+ handle.assert_not_dropped();
+ notified.shutdown();
+ handle.assert_dropped();
+}
+
+#[test]
+fn create_shutdown2() {
+ let (ad, handle) = AssertDrop::new();
+ let (notified, join) = unowned(
+ async {
+ drop(ad);
+ unreachable!()
+ },
+ NoopSchedule,
+ Id::next(),
+ );
+ handle.assert_not_dropped();
+ notified.shutdown();
+ handle.assert_dropped();
+ drop(join);
+}
+
+#[test]
+fn unowned_poll() {
+ let (task, _) = unowned(async {}, NoopSchedule, Id::next());
+ task.run();
}
#[test]
fn schedule() {
with(|rt| {
- let (task, _) = super::joinable(async {
+ rt.spawn(async {
crate::task::yield_now().await;
});
- rt.schedule(task);
-
assert_eq!(2, rt.tick());
+ rt.shutdown();
})
}
#[test]
fn shutdown() {
with(|rt| {
- let (task, _) = super::joinable(async {
+ rt.spawn(async {
loop {
crate::task::yield_now().await;
}
});
- rt.schedule(task);
rt.tick_max(1);
rt.shutdown();
})
}
+#[test]
+fn shutdown_immediately() {
+ with(|rt| {
+ rt.spawn(async {
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.shutdown();
+ })
+}
+
+#[test]
+fn spawn_during_shutdown() {
+ static DID_SPAWN: AtomicBool = AtomicBool::new(false);
+
+ struct SpawnOnDrop(Runtime);
+ impl Drop for SpawnOnDrop {
+ fn drop(&mut self) {
+ DID_SPAWN.store(true, Ordering::SeqCst);
+ self.0.spawn(async {});
+ }
+ }
+
+ with(|rt| {
+ let rt2 = rt.clone();
+ rt.spawn(async move {
+ let _spawn_on_drop = SpawnOnDrop(rt2);
+
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.tick_max(1);
+ rt.shutdown();
+ });
+
+ assert!(DID_SPAWN.load(Ordering::SeqCst));
+}
+
fn with(f: impl FnOnce(Runtime)) {
struct Reset;
@@ -51,10 +242,9 @@ fn with(f: impl FnOnce(Runtime)) {
let _reset = Reset;
let rt = Runtime(Arc::new(Inner {
- released: task::TransferStack::new(),
+ owned: OwnedTasks::new(),
core: TryLock::new(Core {
queue: VecDeque::new(),
- tasks: LinkedList::new(),
}),
}));
@@ -66,18 +256,31 @@ fn with(f: impl FnOnce(Runtime)) {
struct Runtime(Arc<Inner>);
struct Inner {
- released: task::TransferStack<Runtime>,
core: TryLock<Core>,
+ owned: OwnedTasks<Runtime>,
}
struct Core {
queue: VecDeque<task::Notified<Runtime>>,
- tasks: LinkedList<Task<Runtime>, <Task<Runtime> as Link>::Target>,
}
static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
impl Runtime {
+ fn spawn<T>(&self, future: T) -> JoinHandle<T::Output>
+ where
+ T: 'static + Send + Future,
+ T::Output: 'static + Send,
+ {
+ let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next());
+
+ if let Some(notified) = notified {
+ self.schedule(notified);
+ }
+
+ handle
+ }
+
fn tick(&self) -> usize {
self.tick_max(usize::MAX)
}
@@ -88,11 +291,10 @@ impl Runtime {
while !self.is_empty() && n < max {
let task = self.next_task();
n += 1;
+ let task = self.0.owned.assert_owner(task);
task.run();
}
- self.0.maintenance();
-
n
}
@@ -107,50 +309,21 @@ impl Runtime {
fn shutdown(&self) {
let mut core = self.0.core.try_lock().unwrap();
- for task in core.tasks.iter() {
- task.shutdown();
- }
+ self.0.owned.close_and_shutdown_all();
while let Some(task) = core.queue.pop_back() {
- task.shutdown();
+ drop(task);
}
drop(core);
- while !self.0.core.try_lock().unwrap().tasks.is_empty() {
- self.0.maintenance();
- }
- }
-}
-
-impl Inner {
- fn maintenance(&self) {
- use std::mem::ManuallyDrop;
-
- for task in self.released.drain() {
- let task = ManuallyDrop::new(task);
-
- // safety: see worker.rs
- unsafe {
- let ptr = task.header().into();
- self.core.try_lock().unwrap().tasks.remove(ptr);
- }
- }
+ assert!(self.0.owned.is_empty());
}
}
impl Schedule for Runtime {
- fn bind(task: Task<Self>) -> Runtime {
- let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
- rt.0.core.try_lock().unwrap().tasks.push_front(task);
- rt
- }
-
fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
- // safety: copying worker.rs
- let task = unsafe { Task::from_raw(task.header().into()) };
- self.0.released.push(task);
- None
+ self.0.owned.remove(task)
}
fn schedule(&self, task: task::Notified<Self>) {