summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/runtime/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/src/runtime/tests')
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_blocking.rs31
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs49
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_pool.rs380
-rw-r--r--third_party/rust/tokio/src/runtime/tests/loom_queue.rs216
-rw-r--r--third_party/rust/tokio/src/runtime/tests/mod.rs13
-rw-r--r--third_party/rust/tokio/src/runtime/tests/queue.rs202
-rw-r--r--third_party/rust/tokio/src/runtime/tests/task.rs159
7 files changed, 1050 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs
new file mode 100644
index 0000000000..db7048e3f9
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_blocking.rs
@@ -0,0 +1,31 @@
+use crate::runtime::{self, Runtime};
+
+use std::sync::Arc;
+
+#[test]
+fn blocking_shutdown() {
+ loom::model(|| {
+ let v = Arc::new(());
+
+ let rt = mk_runtime(1);
+ rt.enter(|| {
+ for _ in 0..2 {
+ let v = v.clone();
+ crate::task::spawn_blocking(move || {
+ assert!(1 < Arc::strong_count(&v));
+ });
+ }
+ });
+
+ drop(rt);
+ assert_eq!(1, Arc::strong_count(&v));
+ });
+}
+
+fn mk_runtime(num_threads: usize) -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(num_threads)
+ .build()
+ .unwrap()
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs
new file mode 100644
index 0000000000..c126fe479a
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_oneshot.rs
@@ -0,0 +1,49 @@
+use loom::sync::Notify;
+
+use std::sync::{Arc, Mutex};
+
+pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let inner = Arc::new(Inner {
+ notify: Notify::new(),
+ value: Mutex::new(None),
+ });
+
+ let tx = Sender {
+ inner: inner.clone(),
+ };
+ let rx = Receiver { inner };
+
+ (tx, rx)
+}
+
+pub(crate) struct Sender<T> {
+ inner: Arc<Inner<T>>,
+}
+
+pub(crate) struct Receiver<T> {
+ inner: Arc<Inner<T>>,
+}
+
+struct Inner<T> {
+ notify: Notify,
+ value: Mutex<Option<T>>,
+}
+
+impl<T> Sender<T> {
+ pub(crate) fn send(self, value: T) {
+ *self.inner.value.lock().unwrap() = Some(value);
+ self.inner.notify.notify();
+ }
+}
+
+impl<T> Receiver<T> {
+ pub(crate) fn recv(self) -> T {
+ loop {
+ if let Some(v) = self.inner.value.lock().unwrap().take() {
+ return v;
+ }
+
+ self.inner.notify.wait();
+ }
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_pool.rs b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs
new file mode 100644
index 0000000000..c08658cde8
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_pool.rs
@@ -0,0 +1,380 @@
+/// Full runtime loom tests. These are heavy tests and take significant time to
+/// run on CI.
+///
+/// Use `LOOM_MAX_PREEMPTIONS=1` to do a "quick" run as a smoke test.
+///
+/// In order to speed up the C
+use crate::future::poll_fn;
+use crate::runtime::tests::loom_oneshot as oneshot;
+use crate::runtime::{self, Runtime};
+use crate::{spawn, task};
+use tokio_test::assert_ok;
+
+use loom::sync::atomic::{AtomicBool, AtomicUsize};
+use loom::sync::{Arc, Mutex};
+
+use pin_project_lite::pin_project;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::task::{Context, Poll};
+
+/// Tests are divided into groups to make the runs faster on CI.
+mod group_a {
+ use super::*;
+
+ #[test]
+ fn racy_shutdown() {
+ loom::model(|| {
+ let pool = mk_pool(1);
+
+ // here's the case we want to exercise:
+ //
+ // a worker that still has tasks in its local queue gets sent to the blocking pool (due to
+ // block_in_place). the blocking pool is shut down, so drops the worker. the worker's
+ // shutdown method never gets run.
+ //
+ // we do this by spawning two tasks on one worker, the first of which does block_in_place,
+ // and then immediately drop the pool.
+
+ pool.spawn(track(async {
+ crate::task::block_in_place(|| {});
+ }));
+ pool.spawn(track(async {}));
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn pool_multi_spawn() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+ let c1 = Arc::new(AtomicUsize::new(0));
+
+ let (tx, rx) = oneshot::channel();
+ let tx1 = Arc::new(Mutex::new(Some(tx)));
+
+ // Spawn a task
+ let c2 = c1.clone();
+ let tx2 = tx1.clone();
+ pool.spawn(track(async move {
+ spawn(track(async move {
+ if 1 == c1.fetch_add(1, Relaxed) {
+ tx1.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }));
+
+ // Spawn a second task
+ pool.spawn(track(async move {
+ spawn(track(async move {
+ if 1 == c2.fetch_add(1, Relaxed) {
+ tx2.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }));
+
+ rx.recv();
+ });
+ }
+
+ fn only_blocking_inner(first_pending: bool) {
+ loom::model(move || {
+ let pool = mk_pool(1);
+ let (block_tx, block_rx) = oneshot::channel();
+
+ pool.spawn(track(async move {
+ crate::task::block_in_place(move || {
+ block_tx.send(());
+ });
+ if first_pending {
+ task::yield_now().await
+ }
+ }));
+
+ block_rx.recv();
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn only_blocking_without_pending() {
+ only_blocking_inner(false)
+ }
+
+ #[test]
+ fn only_blocking_with_pending() {
+ only_blocking_inner(true)
+ }
+}
+
+mod group_b {
+ use super::*;
+
+ fn blocking_and_regular_inner(first_pending: bool) {
+ const NUM: usize = 3;
+ loom::model(move || {
+ let pool = mk_pool(1);
+ let cnt = Arc::new(AtomicUsize::new(0));
+
+ let (block_tx, block_rx) = oneshot::channel();
+ let (done_tx, done_rx) = oneshot::channel();
+ let done_tx = Arc::new(Mutex::new(Some(done_tx)));
+
+ pool.spawn(track(async move {
+ crate::task::block_in_place(move || {
+ block_tx.send(());
+ });
+ if first_pending {
+ task::yield_now().await
+ }
+ }));
+
+ for _ in 0..NUM {
+ let cnt = cnt.clone();
+ let done_tx = done_tx.clone();
+
+ pool.spawn(track(async move {
+ if NUM == cnt.fetch_add(1, Relaxed) + 1 {
+ done_tx.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+ }
+
+ done_rx.recv();
+ block_rx.recv();
+
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn blocking_and_regular() {
+ blocking_and_regular_inner(false);
+ }
+
+ #[test]
+ fn blocking_and_regular_with_pending() {
+ blocking_and_regular_inner(true);
+ }
+
+ #[test]
+ fn pool_shutdown() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+
+ pool.spawn(track(async move {
+ gated2(true).await;
+ }));
+
+ pool.spawn(track(async move {
+ gated2(false).await;
+ }));
+
+ drop(pool);
+ });
+ }
+
+ #[test]
+ fn join_output() {
+ loom::model(|| {
+ let mut rt = mk_pool(1);
+
+ rt.block_on(async {
+ let t = crate::spawn(track(async { "hello" }));
+
+ let out = assert_ok!(t.await);
+ assert_eq!("hello", out.into_inner());
+ });
+ });
+ }
+
+ #[test]
+ fn poll_drop_handle_then_drop() {
+ loom::model(|| {
+ let mut rt = mk_pool(1);
+
+ rt.block_on(async move {
+ let mut t = crate::spawn(track(async { "hello" }));
+
+ poll_fn(|cx| {
+ let _ = Pin::new(&mut t).poll(cx);
+ Poll::Ready(())
+ })
+ .await;
+ });
+ })
+ }
+
+ #[test]
+ fn complete_block_on_under_load() {
+ loom::model(|| {
+ let mut pool = mk_pool(1);
+
+ pool.block_on(async {
+ // Trigger a re-schedule
+ crate::spawn(track(async {
+ for _ in 0..2 {
+ task::yield_now().await;
+ }
+ }));
+
+ gated2(true).await
+ });
+ });
+ }
+}
+
+mod group_c {
+ use super::*;
+
+ #[test]
+ fn shutdown_with_notification() {
+ use crate::sync::oneshot;
+
+ loom::model(|| {
+ let rt = mk_pool(2);
+ let (done_tx, done_rx) = oneshot::channel::<()>();
+
+ rt.spawn(track(async move {
+ let (tx, rx) = oneshot::channel::<()>();
+
+ crate::spawn(async move {
+ crate::task::spawn_blocking(move || {
+ let _ = tx.send(());
+ });
+
+ let _ = done_rx.await;
+ });
+
+ let _ = rx.await;
+
+ let _ = done_tx.send(());
+ }));
+ });
+ }
+}
+
+mod group_d {
+ use super::*;
+
+ #[test]
+ fn pool_multi_notify() {
+ loom::model(|| {
+ let pool = mk_pool(2);
+
+ let c1 = Arc::new(AtomicUsize::new(0));
+
+ let (done_tx, done_rx) = oneshot::channel();
+ let done_tx1 = Arc::new(Mutex::new(Some(done_tx)));
+
+ // Spawn a task
+ let c2 = c1.clone();
+ let done_tx2 = done_tx1.clone();
+ pool.spawn(track(async move {
+ gated().await;
+ gated().await;
+
+ if 1 == c1.fetch_add(1, Relaxed) {
+ done_tx1.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+
+ // Spawn a second task
+ pool.spawn(track(async move {
+ gated().await;
+ gated().await;
+
+ if 1 == c2.fetch_add(1, Relaxed) {
+ done_tx2.lock().unwrap().take().unwrap().send(());
+ }
+ }));
+
+ done_rx.recv();
+ });
+ }
+}
+
+fn mk_pool(num_threads: usize) -> Runtime {
+ runtime::Builder::new()
+ .threaded_scheduler()
+ .core_threads(num_threads)
+ .build()
+ .unwrap()
+}
+
+fn gated() -> impl Future<Output = &'static str> {
+ gated2(false)
+}
+
+fn gated2(thread: bool) -> impl Future<Output = &'static str> {
+ use loom::thread;
+ use std::sync::Arc;
+
+ let gate = Arc::new(AtomicBool::new(false));
+ let mut fired = false;
+
+ poll_fn(move |cx| {
+ if !fired {
+ let gate = gate.clone();
+ let waker = cx.waker().clone();
+
+ if thread {
+ thread::spawn(move || {
+ gate.store(true, SeqCst);
+ waker.wake_by_ref();
+ });
+ } else {
+ spawn(track(async move {
+ gate.store(true, SeqCst);
+ waker.wake_by_ref();
+ }));
+ }
+
+ fired = true;
+
+ return Poll::Pending;
+ }
+
+ if gate.load(SeqCst) {
+ Poll::Ready("hello world")
+ } else {
+ Poll::Pending
+ }
+ })
+}
+
+fn track<T: Future>(f: T) -> Track<T> {
+ Track {
+ inner: f,
+ arc: Arc::new(()),
+ }
+}
+
+pin_project! {
+ struct Track<T> {
+ #[pin]
+ inner: T,
+ // Arc is used to hook into loom's leak tracking.
+ arc: Arc<()>,
+ }
+}
+
+impl<T> Track<T> {
+ fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T: Future> Future for Track<T> {
+ type Output = Track<T::Output>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+
+ Poll::Ready(Track {
+ inner: ready!(me.inner.poll(cx)),
+ arc: me.arc.clone(),
+ })
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/loom_queue.rs b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs
new file mode 100644
index 0000000000..de02610db0
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/loom_queue.rs
@@ -0,0 +1,216 @@
+use crate::runtime::queue;
+use crate::runtime::task::{self, Schedule, Task};
+
+use loom::thread;
+
+#[test]
+fn basic() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..3 {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..2 {
+ for _ in 0..2 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ // Push another task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(6, n);
+ });
+}
+
+#[test]
+fn steal_overflow() {
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ // push a task, pop a task
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if local.pop().is_some() {
+ n += 1;
+ }
+
+ for _ in 0..6 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ n += th.join().unwrap();
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(7, n);
+ });
+}
+
+#[test]
+fn multi_stealer() {
+ const NUM_TASKS: usize = 5;
+
+ fn steal_tasks(steal: queue::Steal<Runtime>) -> usize {
+ let (_, mut local) = queue::local();
+
+ if steal.steal_into(&mut local).is_none() {
+ return 0;
+ }
+
+ let mut n = 1;
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ n
+ }
+
+ loom::model(|| {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ // Push work
+ for _ in 0..NUM_TASKS {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ let th1 = {
+ let steal = steal.clone();
+ thread::spawn(move || steal_tasks(steal))
+ };
+
+ let th2 = thread::spawn(move || steal_tasks(steal));
+
+ let mut n = 0;
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th1.join().unwrap();
+ n += th2.join().unwrap();
+
+ assert_eq!(n, NUM_TASKS);
+ });
+}
+
+#[test]
+fn chained_steal() {
+ loom::model(|| {
+ let (s1, mut l1) = queue::local();
+ let (s2, mut l2) = queue::local();
+ let inject = queue::Inject::new();
+
+ // Load up some tasks
+ for _ in 0..4 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l1.push_back(task, &inject);
+
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ l2.push_back(task, &inject);
+ }
+
+ // Spawn a task to steal from **our** queue
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ s1.steal_into(&mut local);
+
+ while local.pop().is_some() {}
+ });
+
+ // Drain our tasks, then attempt to steal
+ while l1.pop().is_some() {}
+
+ s2.steal_into(&mut l1);
+
+ th.join().unwrap();
+
+ while l1.pop().is_some() {}
+ while l2.pop().is_some() {}
+ while inject.pop().is_some() {}
+ });
+}
+
+struct Runtime;
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ std::mem::forget(task);
+ Runtime
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/mod.rs b/third_party/rust/tokio/src/runtime/tests/mod.rs
new file mode 100644
index 0000000000..123a7e35a3
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/mod.rs
@@ -0,0 +1,13 @@
+cfg_loom! {
+ mod loom_blocking;
+ mod loom_oneshot;
+ mod loom_pool;
+ mod loom_queue;
+}
+
+cfg_not_loom! {
+ mod queue;
+
+ #[cfg(miri)]
+ mod task;
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/queue.rs b/third_party/rust/tokio/src/runtime/tests/queue.rs
new file mode 100644
index 0000000000..d228d5dcc7
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/queue.rs
@@ -0,0 +1,202 @@
+use crate::runtime::queue;
+use crate::runtime::task::{self, Schedule, Task};
+
+use std::thread;
+use std::time::Duration;
+
+#[test]
+fn fits_256() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..256 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ assert!(inject.pop().is_none());
+
+ while local.pop().is_some() {}
+}
+
+#[test]
+fn overflow() {
+ let (_, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..257 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ let mut n = 0;
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ assert_eq!(n, 257);
+}
+
+#[test]
+fn steal_batch() {
+ let (steal1, mut local1) = queue::local();
+ let (_, mut local2) = queue::local();
+ let inject = queue::Inject::new();
+
+ for _ in 0..4 {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local1.push_back(task, &inject);
+ }
+
+ assert!(steal1.steal_into(&mut local2).is_some());
+
+ for _ in 0..1 {
+ assert!(local2.pop().is_some());
+ }
+
+ assert!(local2.pop().is_none());
+
+ for _ in 0..2 {
+ assert!(local1.pop().is_some());
+ }
+
+ assert!(local1.pop().is_none());
+}
+
+#[test]
+fn stress1() {
+ const NUM_ITER: usize = 1;
+ const NUM_STEAL: usize = 1_000;
+ const NUM_LOCAL: usize = 1_000;
+ const NUM_PUSH: usize = 500;
+ const NUM_POP: usize = 250;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::yield_now();
+ }
+
+ n
+ });
+
+ let mut n = 0;
+
+ for _ in 0..NUM_LOCAL {
+ for _ in 0..NUM_PUSH {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+ }
+
+ for _ in 0..NUM_POP {
+ if local.pop().is_some() {
+ n += 1;
+ } else {
+ break;
+ }
+ }
+ }
+
+ while inject.pop().is_some() {
+ n += 1;
+ }
+
+ n += th.join().unwrap();
+
+ assert_eq!(n, NUM_LOCAL * NUM_PUSH);
+ }
+}
+
+#[test]
+fn stress2() {
+ const NUM_ITER: usize = 1;
+ const NUM_TASKS: usize = 1_000_000;
+ const NUM_STEAL: usize = 1_000;
+
+ for _ in 0..NUM_ITER {
+ let (steal, mut local) = queue::local();
+ let inject = queue::Inject::new();
+
+ let th = thread::spawn(move || {
+ let (_, mut local) = queue::local();
+ let mut n = 0;
+
+ for _ in 0..NUM_STEAL {
+ if steal.steal_into(&mut local).is_some() {
+ n += 1;
+ }
+
+ while local.pop().is_some() {
+ n += 1;
+ }
+
+ thread::sleep(Duration::from_micros(10));
+ }
+
+ n
+ });
+
+ let mut num_pop = 0;
+
+ for i in 0..NUM_TASKS {
+ let (task, _) = task::joinable::<_, Runtime>(async {});
+ local.push_back(task, &inject);
+
+ if i % 128 == 0 && local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+ }
+
+ num_pop += th.join().unwrap();
+
+ while local.pop().is_some() {
+ num_pop += 1;
+ }
+
+ while inject.pop().is_some() {
+ num_pop += 1;
+ }
+
+ assert_eq!(num_pop, NUM_TASKS);
+ }
+}
+
+struct Runtime;
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ std::mem::forget(task);
+ Runtime
+ }
+
+ fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
+ None
+ }
+
+ fn schedule(&self, _task: task::Notified<Self>) {
+ unreachable!();
+ }
+}
diff --git a/third_party/rust/tokio/src/runtime/tests/task.rs b/third_party/rust/tokio/src/runtime/tests/task.rs
new file mode 100644
index 0000000000..82315a04ff
--- /dev/null
+++ b/third_party/rust/tokio/src/runtime/tests/task.rs
@@ -0,0 +1,159 @@
+use crate::runtime::task::{self, Schedule, Task};
+use crate::util::linked_list::LinkedList;
+use crate::util::TryLock;
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+#[test]
+fn create_drop() {
+ let _ = task::joinable::<_, Runtime>(async { unreachable!() });
+}
+
+#[test]
+fn schedule() {
+ with(|rt| {
+ let (task, _) = task::joinable(async {
+ crate::task::yield_now().await;
+ });
+
+ rt.schedule(task);
+
+ assert_eq!(2, rt.tick());
+ })
+}
+
+#[test]
+fn shutdown() {
+ with(|rt| {
+ let (task, _) = task::joinable(async {
+ loop {
+ crate::task::yield_now().await;
+ }
+ });
+
+ rt.schedule(task);
+ rt.tick_max(1);
+
+ rt.shutdown();
+ })
+}
+
+fn with(f: impl FnOnce(Runtime)) {
+ struct Reset;
+
+ impl Drop for Reset {
+ fn drop(&mut self) {
+ let _rt = CURRENT.try_lock().unwrap().take();
+ }
+ }
+
+ let _reset = Reset;
+
+ let rt = Runtime(Arc::new(Inner {
+ released: task::TransferStack::new(),
+ core: TryLock::new(Core {
+ queue: VecDeque::new(),
+ tasks: LinkedList::new(),
+ }),
+ }));
+
+ *CURRENT.try_lock().unwrap() = Some(rt.clone());
+ f(rt)
+}
+
+#[derive(Clone)]
+struct Runtime(Arc<Inner>);
+
+struct Inner {
+ released: task::TransferStack<Runtime>,
+ core: TryLock<Core>,
+}
+
+struct Core {
+ queue: VecDeque<task::Notified<Runtime>>,
+ tasks: LinkedList<Task<Runtime>>,
+}
+
+static CURRENT: TryLock<Option<Runtime>> = TryLock::new(None);
+
+impl Runtime {
+ fn tick(&self) -> usize {
+ self.tick_max(usize::max_value())
+ }
+
+ fn tick_max(&self, max: usize) -> usize {
+ let mut n = 0;
+
+ while !self.is_empty() && n < max {
+ let task = self.next_task();
+ n += 1;
+ task.run();
+ }
+
+ self.0.maintenance();
+
+ n
+ }
+
+ fn is_empty(&self) -> bool {
+ self.0.core.try_lock().unwrap().queue.is_empty()
+ }
+
+ fn next_task(&self) -> task::Notified<Runtime> {
+ self.0.core.try_lock().unwrap().queue.pop_front().unwrap()
+ }
+
+ fn shutdown(&self) {
+ let mut core = self.0.core.try_lock().unwrap();
+
+ for task in core.tasks.iter() {
+ task.shutdown();
+ }
+
+ while let Some(task) = core.queue.pop_back() {
+ task.shutdown();
+ }
+
+ drop(core);
+
+ while !self.0.core.try_lock().unwrap().tasks.is_empty() {
+ self.0.maintenance();
+ }
+ }
+}
+
+impl Inner {
+ fn maintenance(&self) {
+ use std::mem::ManuallyDrop;
+
+ for task in self.released.drain() {
+ let task = ManuallyDrop::new(task);
+
+ // safety: see worker.rs
+ unsafe {
+ let ptr = task.header().into();
+ self.core.try_lock().unwrap().tasks.remove(ptr);
+ }
+ }
+ }
+}
+
+impl Schedule for Runtime {
+ fn bind(task: Task<Self>) -> Runtime {
+ let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone();
+ rt.0.core.try_lock().unwrap().tasks.push_front(task);
+ rt
+ }
+
+ fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
+ // safety: copying worker.rs
+ let task = unsafe { Task::from_raw(task.header().into()) };
+ self.0.released.push(task);
+ None
+ }
+
+ fn schedule(&self, task: task::Notified<Self>) {
+ self.0.core.try_lock().unwrap().queue.push_back(task);
+ }
+}