diff options
Diffstat (limited to 'third_party/rust/tokio/src/runtime/tests/task.rs')
-rw-r--r-- | third_party/rust/tokio/src/runtime/tests/task.rs | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/tests/task.rs b/third_party/rust/tokio/src/runtime/tests/task.rs new file mode 100644 index 0000000000..82315a04ff --- /dev/null +++ b/third_party/rust/tokio/src/runtime/tests/task.rs @@ -0,0 +1,159 @@ +use crate::runtime::task::{self, Schedule, Task}; +use crate::util::linked_list::LinkedList; +use crate::util::TryLock; + +use std::collections::VecDeque; +use std::sync::Arc; + +#[test] +fn create_drop() { + let _ = task::joinable::<_, Runtime>(async { unreachable!() }); +} + +#[test] +fn schedule() { + with(|rt| { + let (task, _) = task::joinable(async { + crate::task::yield_now().await; + }); + + rt.schedule(task); + + assert_eq!(2, rt.tick()); + }) +} + +#[test] +fn shutdown() { + with(|rt| { + let (task, _) = task::joinable(async { + loop { + crate::task::yield_now().await; + } + }); + + rt.schedule(task); + rt.tick_max(1); + + rt.shutdown(); + }) +} + +fn with(f: impl FnOnce(Runtime)) { + struct Reset; + + impl Drop for Reset { + fn drop(&mut self) { + let _rt = CURRENT.try_lock().unwrap().take(); + } + } + + let _reset = Reset; + + let rt = Runtime(Arc::new(Inner { + released: task::TransferStack::new(), + core: TryLock::new(Core { + queue: VecDeque::new(), + tasks: LinkedList::new(), + }), + })); + + *CURRENT.try_lock().unwrap() = Some(rt.clone()); + f(rt) +} + +#[derive(Clone)] +struct Runtime(Arc<Inner>); + +struct Inner { + released: task::TransferStack<Runtime>, + core: TryLock<Core>, +} + +struct Core { + queue: VecDeque<task::Notified<Runtime>>, + tasks: LinkedList<Task<Runtime>>, +} + +static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None); + +impl Runtime { + fn tick(&self) -> usize { + self.tick_max(usize::max_value()) + } + + fn tick_max(&self, max: usize) -> usize { + let mut n = 0; + + while !self.is_empty() && n < max { + let task = self.next_task(); + n += 1; + task.run(); + } + + self.0.maintenance(); + + n + } + + fn is_empty(&self) -> bool { + self.0.core.try_lock().unwrap().queue.is_empty() + } + + fn next_task(&self) -> task::Notified<Runtime> { + self.0.core.try_lock().unwrap().queue.pop_front().unwrap() + } + + fn shutdown(&self) { + let mut core = self.0.core.try_lock().unwrap(); + + for task in core.tasks.iter() { + task.shutdown(); + } + + while let Some(task) = core.queue.pop_back() { + task.shutdown(); + } + + drop(core); + + while !self.0.core.try_lock().unwrap().tasks.is_empty() { + self.0.maintenance(); + } + } +} + +impl Inner { + fn maintenance(&self) { + use std::mem::ManuallyDrop; + + for task in self.released.drain() { + let task = ManuallyDrop::new(task); + + // safety: see worker.rs + unsafe { + let ptr = task.header().into(); + self.core.try_lock().unwrap().tasks.remove(ptr); + } + } + } +} + +impl Schedule for Runtime { + fn bind(task: Task<Self>) -> Runtime { + let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); + rt.0.core.try_lock().unwrap().tasks.push_front(task); + rt + } + + fn release(&self, task: &Task<Self>) -> Option<Task<Self>> { + // safety: copying worker.rs + let task = unsafe { Task::from_raw(task.header().into()) }; + self.0.released.push(task); + None + } + + fn schedule(&self, task: task::Notified<Self>) { + self.0.core.try_lock().unwrap().queue.push_back(task); + } +} |