From dc0db358abe19481e475e10c32149b53370f1a1c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 30 May 2024 05:57:31 +0200 Subject: Merging upstream version 1.72.1+dfsg1. Signed-off-by: Daniel Baumann --- vendor/tokio/src/runtime/tests/task.rs | 269 +++++++++++++++++++++++++++------ 1 file changed, 221 insertions(+), 48 deletions(-) (limited to 'vendor/tokio/src/runtime/tests/task.rs') diff --git a/vendor/tokio/src/runtime/tests/task.rs b/vendor/tokio/src/runtime/tests/task.rs index 7c2012523..a79c0f50d 100644 --- a/vendor/tokio/src/runtime/tests/task.rs +++ b/vendor/tokio/src/runtime/tests/task.rs @@ -1,44 +1,235 @@ -use crate::runtime::task::{self, Schedule, Task}; -use crate::util::linked_list::{Link, LinkedList}; +use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task}; +use crate::runtime::tests::NoopSchedule; use crate::util::TryLock; use std::collections::VecDeque; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +struct AssertDropHandle { + is_dropped: Arc, +} +impl AssertDropHandle { + #[track_caller] + fn assert_dropped(&self) { + assert!(self.is_dropped.load(Ordering::SeqCst)); + } + + #[track_caller] + fn assert_not_dropped(&self) { + assert!(!self.is_dropped.load(Ordering::SeqCst)); + } +} + +struct AssertDrop { + is_dropped: Arc, +} +impl AssertDrop { + fn new() -> (Self, AssertDropHandle) { + let shared = Arc::new(AtomicBool::new(false)); + ( + AssertDrop { + is_dropped: shared.clone(), + }, + AssertDropHandle { + is_dropped: shared.clone(), + }, + ) + } +} +impl Drop for AssertDrop { + fn drop(&mut self) { + self.is_dropped.store(true, Ordering::SeqCst); + } +} + +// A Notified does not shut down on drop, but it is dropped once the ref-count +// hits zero. +#[test] +fn create_drop1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(notified); + handle.assert_not_dropped(); + drop(join); + handle.assert_dropped(); +} + #[test] -fn create_drop() { - let _ = super::joinable::<_, Runtime>(async { unreachable!() }); +fn create_drop2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(join); + handle.assert_not_dropped(); + drop(notified); + handle.assert_dropped(); +} + +#[test] +fn drop_abort_handle1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(join); + handle.assert_not_dropped(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_dropped(); +} + +#[test] +fn drop_abort_handle2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + let abort = join.abort_handle(); + drop(notified); + handle.assert_not_dropped(); + drop(abort); + handle.assert_not_dropped(); + drop(join); + handle.assert_dropped(); +} + +// Shutting down through Notified works +#[test] +fn create_shutdown1() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + drop(join); + handle.assert_not_dropped(); + notified.shutdown(); + handle.assert_dropped(); +} + +#[test] +fn create_shutdown2() { + let (ad, handle) = AssertDrop::new(); + let (notified, join) = unowned( + async { + drop(ad); + unreachable!() + }, + NoopSchedule, + Id::next(), + ); + handle.assert_not_dropped(); + notified.shutdown(); + handle.assert_dropped(); + drop(join); +} + +#[test] +fn unowned_poll() { + let (task, _) = unowned(async {}, NoopSchedule, Id::next()); + task.run(); } #[test] fn schedule() { with(|rt| { - let (task, _) = super::joinable(async { + rt.spawn(async { crate::task::yield_now().await; }); - rt.schedule(task); - assert_eq!(2, rt.tick()); + rt.shutdown(); }) } #[test] fn shutdown() { with(|rt| { - let (task, _) = super::joinable(async { + rt.spawn(async { loop { crate::task::yield_now().await; } }); - rt.schedule(task); rt.tick_max(1); rt.shutdown(); }) } +#[test] +fn shutdown_immediately() { + with(|rt| { + rt.spawn(async { + loop { + crate::task::yield_now().await; + } + }); + + rt.shutdown(); + }) +} + +#[test] +fn spawn_during_shutdown() { + static DID_SPAWN: AtomicBool = AtomicBool::new(false); + + struct SpawnOnDrop(Runtime); + impl Drop for SpawnOnDrop { + fn drop(&mut self) { + DID_SPAWN.store(true, Ordering::SeqCst); + self.0.spawn(async {}); + } + } + + with(|rt| { + let rt2 = rt.clone(); + rt.spawn(async move { + let _spawn_on_drop = SpawnOnDrop(rt2); + + loop { + crate::task::yield_now().await; + } + }); + + rt.tick_max(1); + rt.shutdown(); + }); + + assert!(DID_SPAWN.load(Ordering::SeqCst)); +} + fn with(f: impl FnOnce(Runtime)) { struct Reset; @@ -51,10 +242,9 @@ fn with(f: impl FnOnce(Runtime)) { let _reset = Reset; let rt = Runtime(Arc::new(Inner { - released: task::TransferStack::new(), + owned: OwnedTasks::new(), core: TryLock::new(Core { queue: VecDeque::new(), - tasks: LinkedList::new(), }), })); @@ -66,18 +256,31 @@ fn with(f: impl FnOnce(Runtime)) { struct Runtime(Arc); struct Inner { - released: task::TransferStack, core: TryLock, + owned: OwnedTasks, } struct Core { queue: VecDeque>, - tasks: LinkedList, as Link>::Target>, } static CURRENT: TryLock> = TryLock::new(None); impl Runtime { + fn spawn(&self, future: T) -> JoinHandle + where + T: 'static + Send + Future, + T::Output: 'static + Send, + { + let (handle, notified) = self.0.owned.bind(future, self.clone(), Id::next()); + + if let Some(notified) = notified { + self.schedule(notified); + } + + handle + } + fn tick(&self) -> usize { self.tick_max(usize::MAX) } @@ -88,11 +291,10 @@ impl Runtime { while !self.is_empty() && n < max { let task = self.next_task(); n += 1; + let task = self.0.owned.assert_owner(task); task.run(); } - self.0.maintenance(); - n } @@ -107,50 +309,21 @@ impl Runtime { fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); - for task in core.tasks.iter() { - task.shutdown(); - } + self.0.owned.close_and_shutdown_all(); while let Some(task) = core.queue.pop_back() { - task.shutdown(); + drop(task); } drop(core); - while !self.0.core.try_lock().unwrap().tasks.is_empty() { - self.0.maintenance(); - } - } -} - -impl Inner { - fn maintenance(&self) { - use std::mem::ManuallyDrop; - - for task in self.released.drain() { - let task = ManuallyDrop::new(task); - - // safety: see worker.rs - unsafe { - let ptr = task.header().into(); - self.core.try_lock().unwrap().tasks.remove(ptr); - } - } + assert!(self.0.owned.is_empty()); } } impl Schedule for Runtime { - fn bind(task: Task) -> Runtime { - let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); - rt.0.core.try_lock().unwrap().tasks.push_front(task); - rt - } - fn release(&self, task: &Task) -> Option> { - // safety: copying worker.rs - let task = unsafe { Task::from_raw(task.header().into()) }; - self.0.released.push(task); - None + self.0.owned.remove(task) } fn schedule(&self, task: task::Notified) { -- cgit v1.2.3