summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/sync/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
commit698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch)
tree173a775858bd501c378080a10dca74132f05bc50 /vendor/tokio/src/sync/tests
parentInitial commit. (diff)
downloadrustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz
rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/src/sync/tests')
-rw-r--r--vendor/tokio/src/sync/tests/atomic_waker.rs34
-rw-r--r--vendor/tokio/src/sync/tests/loom_atomic_waker.rs45
-rw-r--r--vendor/tokio/src/sync/tests/loom_broadcast.rs207
-rw-r--r--vendor/tokio/src/sync/tests/loom_list.rs48
-rw-r--r--vendor/tokio/src/sync/tests/loom_mpsc.rs134
-rw-r--r--vendor/tokio/src/sync/tests/loom_notify.rs140
-rw-r--r--vendor/tokio/src/sync/tests/loom_oneshot.rs140
-rw-r--r--vendor/tokio/src/sync/tests/loom_rwlock.rs105
-rw-r--r--vendor/tokio/src/sync/tests/loom_semaphore_batch.rs215
-rw-r--r--vendor/tokio/src/sync/tests/loom_watch.rs36
-rw-r--r--vendor/tokio/src/sync/tests/mod.rs16
-rw-r--r--vendor/tokio/src/sync/tests/semaphore_batch.rs250
12 files changed, 1370 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/tests/atomic_waker.rs b/vendor/tokio/src/sync/tests/atomic_waker.rs
new file mode 100644
index 000000000..c832d62e9
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/atomic_waker.rs
@@ -0,0 +1,34 @@
+use crate::sync::AtomicWaker;
+use tokio_test::task;
+
+use std::task::Waker;
+
+trait AssertSend: Send {}
+trait AssertSync: Send {}
+
+impl AssertSend for AtomicWaker {}
+impl AssertSync for AtomicWaker {}
+
+impl AssertSend for Waker {}
+impl AssertSync for Waker {}
+
+#[test]
+fn basic_usage() {
+ let mut waker = task::spawn(AtomicWaker::new());
+
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
+ waker.wake();
+
+ assert!(waker.is_woken());
+}
+
+#[test]
+fn wake_without_register() {
+ let mut waker = task::spawn(AtomicWaker::new());
+ waker.wake();
+
+ // Registering should not result in a notification
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
+
+ assert!(!waker.is_woken());
+}
diff --git a/vendor/tokio/src/sync/tests/loom_atomic_waker.rs b/vendor/tokio/src/sync/tests/loom_atomic_waker.rs
new file mode 100644
index 000000000..c148bcbe1
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_atomic_waker.rs
@@ -0,0 +1,45 @@
+use crate::sync::task::AtomicWaker;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::atomic::AtomicUsize;
+use loom::thread;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+
+struct Chan {
+ num: AtomicUsize,
+ task: AtomicWaker,
+}
+
+#[test]
+fn basic_notification() {
+ const NUM_NOTIFY: usize = 2;
+
+ loom::model(|| {
+ let chan = Arc::new(Chan {
+ num: AtomicUsize::new(0),
+ task: AtomicWaker::new(),
+ });
+
+ for _ in 0..NUM_NOTIFY {
+ let chan = chan.clone();
+
+ thread::spawn(move || {
+ chan.num.fetch_add(1, Relaxed);
+ chan.task.wake();
+ });
+ }
+
+ block_on(poll_fn(move |cx| {
+ chan.task.register_by_ref(cx.waker());
+
+ if NUM_NOTIFY == chan.num.load(Relaxed) {
+ return Ready(());
+ }
+
+ Pending
+ }));
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_broadcast.rs b/vendor/tokio/src/sync/tests/loom_broadcast.rs
new file mode 100644
index 000000000..039b01bf4
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_broadcast.rs
@@ -0,0 +1,207 @@
+use crate::sync::broadcast;
+use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+use tokio_test::{assert_err, assert_ok};
+
+#[test]
+fn broadcast_send() {
+ loom::model(|| {
+ let (tx1, mut rx) = broadcast::channel(2);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx1.send("one"));
+ assert_ok!(tx1.send("two"));
+ assert_ok!(tx1.send("three"));
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx2.send("eins"));
+ assert_ok!(tx2.send("zwei"));
+ assert_ok!(tx2.send("drei"));
+ });
+ });
+
+ block_on(async {
+ let mut num = 0;
+ loop {
+ match rx.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+ assert_eq!(num, 6);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+// An `Arc` is used as the value in order to detect memory leaks.
+#[test]
+fn broadcast_two() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx2.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ assert_ok!(tx.send(Arc::new("hello")));
+ assert_ok!(tx.send(Arc::new("world")));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn broadcast_wrap() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(2);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx1.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx2.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn drop_rx() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "one");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "two");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "three");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ drop(rx2);
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn drop_multiple_rx_with_overflow() {
+ loom::model(move || {
+ // It is essential to have multiple senders and receivers in this test case.
+ let (tx, mut rx) = broadcast::channel(1);
+ let _rx2 = tx.subscribe();
+
+ let _ = tx.send(());
+ let tx2 = tx.clone();
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ for _ in 0..100 {
+ let _ = tx2.send(());
+ }
+ });
+ });
+ let _ = tx.send(());
+
+ let th2 = thread::spawn(move || {
+ block_on(async { while let Ok(_) = rx.recv().await {} });
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_list.rs b/vendor/tokio/src/sync/tests/loom_list.rs
new file mode 100644
index 000000000..4067f865c
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_list.rs
@@ -0,0 +1,48 @@
+use crate::sync::mpsc::list;
+
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn smoke() {
+ use crate::sync::mpsc::block::Read::*;
+
+ const NUM_TX: usize = 2;
+ const NUM_MSG: usize = 2;
+
+ loom::model(|| {
+ let (tx, mut rx) = list::channel();
+ let tx = Arc::new(tx);
+
+ for th in 0..NUM_TX {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ for i in 0..NUM_MSG {
+ tx.push((th, i));
+ }
+ });
+ }
+
+ let mut next = vec![0; NUM_TX];
+
+ loop {
+ match rx.pop(&tx) {
+ Some(Value((th, v))) => {
+ assert_eq!(v, next[th]);
+ next[th] += 1;
+
+ if next.iter().all(|&i| i == NUM_MSG) {
+ break;
+ }
+ }
+ Some(Closed) => {
+ panic!();
+ }
+ None => {
+ thread::yield_now();
+ }
+ }
+ }
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_mpsc.rs b/vendor/tokio/src/sync/tests/loom_mpsc.rs
new file mode 100644
index 000000000..c12313bd3
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_mpsc.rs
@@ -0,0 +1,134 @@
+use crate::sync::mpsc;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+use tokio_test::assert_ok;
+
+#[test]
+fn closing_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::channel(16);
+
+ thread::spawn(move || {
+ tx.try_send(()).unwrap();
+ drop(tx);
+ });
+
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn closing_unbounded_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ thread::spawn(move || {
+ tx.send(()).unwrap();
+ drop(tx);
+ });
+
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn closing_bounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::channel::<()>(16);
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn closing_and_sending() {
+ loom::model(|| {
+ let (tx1, mut rx) = mpsc::channel::<()>(16);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.try_send(()).unwrap();
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(tx2.closed());
+ });
+
+ let th3 = thread::spawn(move || {
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+ drop(rx);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ assert_ok!(th3.join());
+ });
+}
+
+#[test]
+fn closing_unbounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn dropping_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::channel::<()>(16);
+
+ for _ in 0..2 {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ drop(tx);
+ });
+ }
+ drop(tx);
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn dropping_unbounded_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::unbounded_channel::<()>();
+
+ for _ in 0..2 {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ drop(tx);
+ });
+ }
+ drop(tx);
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_notify.rs b/vendor/tokio/src/sync/tests/loom_notify.rs
new file mode 100644
index 000000000..d484a7581
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_notify.rs
@@ -0,0 +1,140 @@
+use crate::sync::Notify;
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+
+#[test]
+fn notify_one() {
+ loom::model(|| {
+ let tx = Arc::new(Notify::new());
+ let rx = tx.clone();
+
+ let th = thread::spawn(move || {
+ block_on(async {
+ rx.notified().await;
+ });
+ });
+
+ tx.notify_one();
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_waiters() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let tx = notify.clone();
+ let notified1 = notify.notified();
+ let notified2 = notify.notified();
+
+ let th = thread::spawn(move || {
+ tx.notify_waiters();
+ });
+
+ block_on(async {
+ notified1.await;
+ notified2.await;
+ });
+
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_waiters_and_one() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let tx1 = notify.clone();
+ let tx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.notify_waiters();
+ });
+
+ let th2 = thread::spawn(move || {
+ tx2.notify_one();
+ });
+
+ let th3 = thread::spawn(move || {
+ let notified = notify.notified();
+
+ block_on(async {
+ notified.await;
+ });
+ });
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ th3.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_multi() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let notify = notify.clone();
+
+ ths.push(thread::spawn(move || {
+ block_on(async {
+ notify.notified().await;
+ notify.notify_one();
+ })
+ }));
+ }
+
+ notify.notify_one();
+
+ for th in ths.drain(..) {
+ th.join().unwrap();
+ }
+
+ block_on(async {
+ notify.notified().await;
+ });
+ });
+}
+
+#[test]
+fn notify_drop() {
+ use crate::future::poll_fn;
+ use std::future::Future;
+ use std::task::Poll;
+
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let rx1 = notify.clone();
+ let rx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ let mut recv = Box::pin(rx1.notified());
+
+ block_on(poll_fn(|cx| {
+ if recv.as_mut().poll(cx).is_ready() {
+ rx1.notify_one();
+ }
+ Poll::Ready(())
+ }));
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ rx2.notified().await;
+ // Trigger second notification
+ rx2.notify_one();
+ rx2.notified().await;
+ });
+ });
+
+ notify.notify_one();
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_oneshot.rs b/vendor/tokio/src/sync/tests/loom_oneshot.rs
new file mode 100644
index 000000000..c5f797207
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_oneshot.rs
@@ -0,0 +1,140 @@
+use crate::sync::oneshot;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+use std::task::Poll::{Pending, Ready};
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ });
+}
+
+#[test]
+fn changing_rx_task() {
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let rx = thread::spawn(move || {
+ let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
+ Ready(Ok(value)) => {
+ assert_eq!(1, value);
+ Ready(true)
+ }
+ Ready(Err(_)) => unimplemented!(),
+ Pending => Ready(false),
+ }));
+
+ if ready {
+ None
+ } else {
+ Some(rx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(rx) = rx {
+ // Previous task parked, use a new task...
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ }
+ });
+}
+
+#[test]
+fn try_recv_close() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+ thread::spawn(move || {
+ let _ = tx.send(());
+ });
+
+ rx.close();
+ let _ = rx.try_recv();
+ })
+}
+
+#[test]
+fn recv_closed() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let _ = tx.send(1);
+ });
+
+ rx.close();
+ let _ = block_on(rx);
+ });
+}
+
+// TODO: Move this into `oneshot` proper.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct OnClose<'a> {
+ tx: &'a mut oneshot::Sender<i32>,
+}
+
+impl<'a> OnClose<'a> {
+ fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
+ OnClose { tx }
+ }
+}
+
+impl Future for OnClose<'_> {
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let fut = self.get_mut().tx.closed();
+ crate::pin!(fut);
+
+ Ready(fut.poll(cx).is_ready())
+ }
+}
+
+#[test]
+fn changing_tx_task() {
+ loom::model(|| {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ let tx = thread::spawn(move || {
+ let t1 = block_on(OnClose::new(&mut tx));
+
+ if t1 {
+ None
+ } else {
+ Some(tx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(mut tx) = tx {
+ // Previous task parked, use a new task...
+ block_on(OnClose::new(&mut tx));
+ }
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_rwlock.rs b/vendor/tokio/src/sync/tests/loom_rwlock.rs
new file mode 100644
index 000000000..4b5cc7edc
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_rwlock.rs
@@ -0,0 +1,105 @@
+use crate::sync::rwlock::*;
+
+use loom::future::block_on;
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn concurrent_write() {
+ let b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write_owned().await;
+ *guard += 5;
+ });
+ });
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ //when all threads have finished the value on the lock should be 10
+ let guard = block_on(rwlock.read());
+ assert_eq!(10, *guard);
+ });
+}
+
+#[test]
+fn concurrent_read_write() {
+ let b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write_owned().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t3 = thread::spawn(move || {
+ block_on(async {
+ let guard = rwclone.read().await;
+ //at this state the value on the lock may either be 0, 5, or 10
+ assert!(*guard == 0 || *guard == 5 || *guard == 10);
+ });
+ });
+
+ {
+ let guard = block_on(rwlock.clone().read_owned());
+ //at this state the value on the lock may either be 0, 5, or 10
+ assert!(*guard == 0 || *guard == 5 || *guard == 10);
+ }
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ t3.join().expect("thread 3 read should not panic");
+
+ let guard = block_on(rwlock.read());
+ //when all threads have finished the value on the lock should be 10
+ assert_eq!(10, *guard);
+ });
+}
+#[test]
+fn downgrade() {
+ loom::model(|| {
+ let lock = Arc::new(RwLock::new(1));
+
+ let n = block_on(lock.write());
+
+ let cloned_lock = lock.clone();
+ let handle = thread::spawn(move || {
+ let mut guard = block_on(cloned_lock.write());
+ *guard = 2;
+ });
+
+ let n = n.downgrade();
+ assert_eq!(*n, 1);
+
+ drop(n);
+ handle.join().unwrap();
+ assert_eq!(*block_on(lock.read()), 2);
+ });
+}
diff --git a/vendor/tokio/src/sync/tests/loom_semaphore_batch.rs b/vendor/tokio/src/sync/tests/loom_semaphore_batch.rs
new file mode 100644
index 000000000..76a1bc006
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_semaphore_batch.rs
@@ -0,0 +1,215 @@
+use crate::sync::batch_semaphore::*;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::atomic::AtomicUsize;
+use loom::thread;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::Arc;
+use std::task::Poll::Ready;
+use std::task::{Context, Poll};
+
+#[test]
+fn basic_usage() {
+ const NUM: usize = 2;
+
+ struct Shared {
+ semaphore: Semaphore,
+ active: AtomicUsize,
+ }
+
+ async fn actor(shared: Arc<Shared>) {
+ shared.semaphore.acquire(1).await.unwrap();
+ let actual = shared.active.fetch_add(1, SeqCst);
+ assert!(actual <= NUM - 1);
+
+ let actual = shared.active.fetch_sub(1, SeqCst);
+ assert!(actual <= NUM);
+ shared.semaphore.release(1);
+ }
+
+ loom::model(|| {
+ let shared = Arc::new(Shared {
+ semaphore: Semaphore::new(NUM),
+ active: AtomicUsize::new(0),
+ });
+
+ for _ in 0..NUM {
+ let shared = shared.clone();
+
+ thread::spawn(move || {
+ block_on(actor(shared));
+ });
+ }
+
+ block_on(actor(shared));
+ });
+}
+
+#[test]
+fn release() {
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || {
+ block_on(semaphore.acquire(1)).unwrap();
+ semaphore.release(1);
+ });
+ }
+
+ block_on(semaphore.acquire(1)).unwrap();
+
+ semaphore.release(1);
+ });
+}
+
+#[test]
+fn basic_closing() {
+ const NUM: usize = 2;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ for _ in 0..2 {
+ block_on(semaphore.acquire(1)).map_err(|_| ())?;
+
+ semaphore.release(1);
+ }
+
+ Ok::<(), ()>(())
+ });
+ }
+
+ semaphore.close();
+ });
+}
+
+#[test]
+fn concurrent_close() {
+ const NUM: usize = 3;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ block_on(semaphore.acquire(1)).map_err(|_| ())?;
+ semaphore.release(1);
+ semaphore.close();
+
+ Ok::<(), ()>(())
+ });
+ }
+ });
+}
+
+#[test]
+fn concurrent_cancel() {
+ async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
+ let mut acquire1 = Some(semaphore.acquire(1));
+ let mut acquire2 = Some(semaphore.acquire(1));
+ poll_fn(|cx| {
+ // poll the acquire future once, and then immediately throw
+ // it away. this simulates a situation where a future is
+ // polled and then cancelled, such as by a timeout.
+ if let Some(acquire) = acquire1.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ if let Some(acquire) = acquire2.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ Poll::Ready(())
+ })
+ .await
+ }
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(0));
+ let t1 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t2 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t3 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+
+ t1.join().unwrap();
+ semaphore.release(10);
+ t2.join().unwrap();
+ t3.join().unwrap();
+ });
+}
+
+#[test]
+fn batch() {
+ let mut b = loom::model::Builder::new();
+ b.preemption_bound = Some(1);
+
+ b.check(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ let active = Arc::new(AtomicUsize::new(0));
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let semaphore = semaphore.clone();
+ let active = active.clone();
+
+ ths.push(thread::spawn(move || {
+ for n in &[4, 10, 8] {
+ block_on(semaphore.acquire(*n)).unwrap();
+
+ active.fetch_add(*n as usize, SeqCst);
+
+ let num_active = active.load(SeqCst);
+ assert!(num_active <= 10);
+
+ thread::yield_now();
+
+ active.fetch_sub(*n as usize, SeqCst);
+
+ semaphore.release(*n as usize);
+ }
+ }));
+ }
+
+ for th in ths.into_iter() {
+ th.join().unwrap();
+ }
+
+ assert_eq!(10, semaphore.available_permits());
+ });
+}
+
+#[test]
+fn release_during_acquire() {
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ semaphore
+ .try_acquire(8)
+ .expect("try_acquire should succeed; semaphore uncontended");
+ let semaphore2 = semaphore.clone();
+ let thread = thread::spawn(move || block_on(semaphore2.acquire(4)).unwrap());
+
+ semaphore.release(8);
+ thread.join().unwrap();
+ semaphore.release(4);
+ assert_eq!(10, semaphore.available_permits());
+ })
+}
diff --git a/vendor/tokio/src/sync/tests/loom_watch.rs b/vendor/tokio/src/sync/tests/loom_watch.rs
new file mode 100644
index 000000000..c575b5b66
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_watch.rs
@@ -0,0 +1,36 @@
+use crate::sync::watch;
+
+use loom::future::block_on;
+use loom::thread;
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, mut rx1) = watch::channel(1);
+ let mut rx2 = rx1.clone();
+ let mut rx3 = rx1.clone();
+ let mut rx4 = rx1.clone();
+ let mut rx5 = rx1.clone();
+
+ let th = thread::spawn(move || {
+ tx.send(2).unwrap();
+ });
+
+ block_on(rx1.changed()).unwrap();
+ assert_eq!(*rx1.borrow(), 2);
+
+ block_on(rx2.changed()).unwrap();
+ assert_eq!(*rx2.borrow(), 2);
+
+ block_on(rx3.changed()).unwrap();
+ assert_eq!(*rx3.borrow(), 2);
+
+ block_on(rx4.changed()).unwrap();
+ assert_eq!(*rx4.borrow(), 2);
+
+ block_on(rx5.changed()).unwrap();
+ assert_eq!(*rx5.borrow(), 2);
+
+ th.join().unwrap();
+ })
+}
diff --git a/vendor/tokio/src/sync/tests/mod.rs b/vendor/tokio/src/sync/tests/mod.rs
new file mode 100644
index 000000000..c5d560196
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/mod.rs
@@ -0,0 +1,16 @@
+cfg_not_loom! {
+ mod atomic_waker;
+ mod semaphore_batch;
+}
+
+cfg_loom! {
+ mod loom_atomic_waker;
+ mod loom_broadcast;
+ mod loom_list;
+ mod loom_mpsc;
+ mod loom_notify;
+ mod loom_oneshot;
+ mod loom_semaphore_batch;
+ mod loom_watch;
+ mod loom_rwlock;
+}
diff --git a/vendor/tokio/src/sync/tests/semaphore_batch.rs b/vendor/tokio/src/sync/tests/semaphore_batch.rs
new file mode 100644
index 000000000..9342cd1cb
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/semaphore_batch.rs
@@ -0,0 +1,250 @@
+use crate::sync::batch_semaphore::Semaphore;
+use tokio_test::*;
+
+#[test]
+fn poll_acquire_one_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 99);
+}
+
+#[test]
+fn poll_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ assert_ready_ok!(task::spawn(s.acquire(5)).poll());
+ assert_eq!(s.available_permits(), 95);
+
+ assert_ready_ok!(task::spawn(s.acquire(5)).poll());
+ assert_eq!(s.available_permits(), 90);
+}
+
+#[test]
+fn try_acquire_one_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 99);
+
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 98);
+}
+
+#[test]
+fn try_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ assert_ok!(s.try_acquire(5));
+ assert_eq!(s.available_permits(), 95);
+
+ assert_ok!(s.try_acquire(5));
+ assert_eq!(s.available_permits(), 90);
+}
+
+#[test]
+fn poll_acquire_one_unavailable() {
+ let s = Semaphore::new(1);
+
+ // Acquire the first permit
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 0);
+
+ let mut acquire_2 = task::spawn(s.acquire(1));
+ // Try to acquire the second permit
+ assert_pending!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(acquire_2.is_woken());
+ assert_ready_ok!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn poll_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ // Acquire the first permit
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 4);
+
+ // Try to acquire the second permit
+ let mut acquire_2 = task::spawn(s.acquire(5));
+ assert_pending!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ // Try to acquire the third permit
+ let mut acquire_3 = task::spawn(s.acquire(3));
+ assert_pending!(acquire_3.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(acquire_2.is_woken());
+ assert_ready_ok!(acquire_2.poll());
+
+ assert!(!acquire_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+ assert!(!acquire_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(2);
+ assert!(acquire_3.is_woken());
+
+ assert_ready_ok!(acquire_3.poll());
+}
+
+#[test]
+fn try_acquire_one_unavailable() {
+ let s = Semaphore::new(1);
+
+ // Acquire the first permit
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 0);
+
+ assert_err!(s.try_acquire(1));
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 1);
+ assert_ok!(s.try_acquire(1));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn try_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ // Acquire the first permit
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 4);
+
+ assert_err!(s.try_acquire(5));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 5);
+
+ assert_ok!(s.try_acquire(5));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 2);
+}
+
+#[test]
+fn poll_acquire_one_zero_permits() {
+ let s = Semaphore::new(0);
+ assert_eq!(s.available_permits(), 0);
+
+ // Try to acquire the permit
+ let mut acquire = task::spawn(s.acquire(1));
+ assert_pending!(acquire.poll());
+
+ s.release(1);
+
+ assert!(acquire.is_woken());
+ assert_ready_ok!(acquire.poll());
+}
+
+#[test]
+#[should_panic]
+fn validates_max_permits() {
+ use std::usize;
+ Semaphore::new((usize::MAX >> 2) + 1);
+}
+
+#[test]
+fn close_semaphore_prevents_acquire() {
+ let s = Semaphore::new(5);
+ s.close();
+
+ assert_eq!(5, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(5, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(5, s.available_permits());
+}
+
+#[test]
+fn close_semaphore_notifies_permit1() {
+ let s = Semaphore::new(0);
+ let mut acquire = task::spawn(s.acquire(1));
+
+ assert_pending!(acquire.poll());
+
+ s.close();
+
+ assert!(acquire.is_woken());
+ assert_ready_err!(acquire.poll());
+}
+
+#[test]
+fn close_semaphore_notifies_permit2() {
+ let s = Semaphore::new(2);
+
+ // Acquire a couple of permits
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+
+ let mut acquire3 = task::spawn(s.acquire(1));
+ let mut acquire4 = task::spawn(s.acquire(1));
+ assert_pending!(acquire3.poll());
+ assert_pending!(acquire4.poll());
+
+ s.close();
+
+ assert!(acquire3.is_woken());
+ assert!(acquire4.is_woken());
+
+ assert_ready_err!(acquire3.poll());
+ assert_ready_err!(acquire4.poll());
+
+ assert_eq!(0, s.available_permits());
+
+ s.release(1);
+
+ assert_eq!(1, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+
+ s.release(1);
+
+ assert_eq!(2, s.available_permits());
+}
+
+#[test]
+fn cancel_acquire_releases_permits() {
+ let s = Semaphore::new(10);
+ s.try_acquire(4).expect("uncontended try_acquire succeeds");
+ assert_eq!(6, s.available_permits());
+
+ let mut acquire = task::spawn(s.acquire(8));
+ assert_pending!(acquire.poll());
+
+ assert_eq!(0, s.available_permits());
+ drop(acquire);
+
+ assert_eq!(6, s.available_permits());
+ assert_ok!(s.try_acquire(6));
+}