summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/src/sync/tests
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 17:32:43 +0000
commit6bf0a5cb5034a7e684dcc3500e841785237ce2dd (patch)
treea68f146d7fa01f0134297619fbe7e33db084e0aa /third_party/rust/tokio/src/sync/tests
parentInitial commit. (diff)
downloadthunderbird-upstream.tar.xz
thunderbird-upstream.zip
Adding upstream version 1:115.7.0.upstream/1%115.7.0upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/tokio/src/sync/tests')
-rw-r--r--third_party/rust/tokio/src/sync/tests/atomic_waker.rs77
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_atomic_waker.rs100
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_broadcast.rs207
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_list.rs48
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_mpsc.rs190
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_notify.rs140
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_oneshot.rs140
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_rwlock.rs105
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_semaphore_batch.rs215
-rw-r--r--third_party/rust/tokio/src/sync/tests/loom_watch.rs36
-rw-r--r--third_party/rust/tokio/src/sync/tests/mod.rs17
-rw-r--r--third_party/rust/tokio/src/sync/tests/notify.rs81
-rw-r--r--third_party/rust/tokio/src/sync/tests/semaphore_batch.rs254
13 files changed, 1610 insertions, 0 deletions
diff --git a/third_party/rust/tokio/src/sync/tests/atomic_waker.rs b/third_party/rust/tokio/src/sync/tests/atomic_waker.rs
new file mode 100644
index 0000000000..ec13cbd658
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/atomic_waker.rs
@@ -0,0 +1,77 @@
+use crate::sync::AtomicWaker;
+use tokio_test::task;
+
+use std::task::Waker;
+
+trait AssertSend: Send {}
+trait AssertSync: Send {}
+
+impl AssertSend for AtomicWaker {}
+impl AssertSync for AtomicWaker {}
+
+impl AssertSend for Waker {}
+impl AssertSync for Waker {}
+
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+
+#[test]
+fn basic_usage() {
+ let mut waker = task::spawn(AtomicWaker::new());
+
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
+ waker.wake();
+
+ assert!(waker.is_woken());
+}
+
+#[test]
+fn wake_without_register() {
+ let mut waker = task::spawn(AtomicWaker::new());
+ waker.wake();
+
+ // Registering should not result in a notification
+ waker.enter(|cx, waker| waker.register_by_ref(cx.waker()));
+
+ assert!(!waker.is_woken());
+}
+
+#[test]
+#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding
+fn atomic_waker_panic_safe() {
+ use std::panic;
+ use std::ptr;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ static PANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ |_| panic!("clone"),
+ |_| unimplemented!("wake"),
+ |_| unimplemented!("wake_by_ref"),
+ |_| (),
+ );
+
+ static NONPANICKING_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ |_| RawWaker::new(ptr::null(), &NONPANICKING_VTABLE),
+ |_| unimplemented!("wake"),
+ |_| unimplemented!("wake_by_ref"),
+ |_| (),
+ );
+
+ let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) };
+ let nonpanicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NONPANICKING_VTABLE)) };
+
+ let atomic_waker = AtomicWaker::new();
+
+ let panicking = panic::AssertUnwindSafe(&panicking);
+
+ let result = panic::catch_unwind(|| {
+ let panic::AssertUnwindSafe(panicking) = panicking;
+ atomic_waker.register_by_ref(panicking);
+ });
+
+ assert!(result.is_err());
+ assert!(atomic_waker.take_waker().is_none());
+
+ atomic_waker.register_by_ref(&nonpanicking);
+ assert!(atomic_waker.take_waker().is_some());
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_atomic_waker.rs b/third_party/rust/tokio/src/sync/tests/loom_atomic_waker.rs
new file mode 100644
index 0000000000..f8bae65d13
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_atomic_waker.rs
@@ -0,0 +1,100 @@
+use crate::sync::task::AtomicWaker;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::atomic::AtomicUsize;
+use loom::thread;
+use std::sync::atomic::Ordering::Relaxed;
+use std::sync::Arc;
+use std::task::Poll::{Pending, Ready};
+
+struct Chan {
+ num: AtomicUsize,
+ task: AtomicWaker,
+}
+
+#[test]
+fn basic_notification() {
+ const NUM_NOTIFY: usize = 2;
+
+ loom::model(|| {
+ let chan = Arc::new(Chan {
+ num: AtomicUsize::new(0),
+ task: AtomicWaker::new(),
+ });
+
+ for _ in 0..NUM_NOTIFY {
+ let chan = chan.clone();
+
+ thread::spawn(move || {
+ chan.num.fetch_add(1, Relaxed);
+ chan.task.wake();
+ });
+ }
+
+ block_on(poll_fn(move |cx| {
+ chan.task.register_by_ref(cx.waker());
+
+ if NUM_NOTIFY == chan.num.load(Relaxed) {
+ return Ready(());
+ }
+
+ Pending
+ }));
+ });
+}
+
+#[test]
+fn test_panicky_waker() {
+ use std::panic;
+ use std::ptr;
+ use std::task::{RawWaker, RawWakerVTable, Waker};
+
+ static PANICKING_VTABLE: RawWakerVTable =
+ RawWakerVTable::new(|_| panic!("clone"), |_| (), |_| (), |_| ());
+
+ let panicking = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &PANICKING_VTABLE)) };
+
+ // If you're working with this test (and I sure hope you never have to!),
+ // uncomment the following section because there will be a lot of panics
+ // which would otherwise log.
+ //
+ // We can't however leaved it uncommented, because it's global.
+ // panic::set_hook(Box::new(|_| ()));
+
+ const NUM_NOTIFY: usize = 2;
+
+ loom::model(move || {
+ let chan = Arc::new(Chan {
+ num: AtomicUsize::new(0),
+ task: AtomicWaker::new(),
+ });
+
+ for _ in 0..NUM_NOTIFY {
+ let chan = chan.clone();
+
+ thread::spawn(move || {
+ chan.num.fetch_add(1, Relaxed);
+ chan.task.wake();
+ });
+ }
+
+ // Note: this panic should have no effect on the overall state of the
+ // waker and it should proceed as normal.
+ //
+ // A thread above might race to flag a wakeup, and a WAKING state will
+ // be preserved if this expected panic races with that so the below
+ // procedure should be allowed to continue uninterrupted.
+ let _ = panic::catch_unwind(|| chan.task.register_by_ref(&panicking));
+
+ block_on(poll_fn(move |cx| {
+ chan.task.register_by_ref(cx.waker());
+
+ if NUM_NOTIFY == chan.num.load(Relaxed) {
+ return Ready(());
+ }
+
+ Pending
+ }));
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_broadcast.rs b/third_party/rust/tokio/src/sync/tests/loom_broadcast.rs
new file mode 100644
index 0000000000..039b01bf43
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_broadcast.rs
@@ -0,0 +1,207 @@
+use crate::sync::broadcast;
+use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+use tokio_test::{assert_err, assert_ok};
+
+#[test]
+fn broadcast_send() {
+ loom::model(|| {
+ let (tx1, mut rx) = broadcast::channel(2);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx1.send("one"));
+ assert_ok!(tx1.send("two"));
+ assert_ok!(tx1.send("three"));
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ assert_ok!(tx2.send("eins"));
+ assert_ok!(tx2.send("zwei"));
+ assert_ok!(tx2.send("drei"));
+ });
+ });
+
+ block_on(async {
+ let mut num = 0;
+ loop {
+ match rx.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+ assert_eq!(num, 6);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+// An `Arc` is used as the value in order to detect memory leaks.
+#[test]
+fn broadcast_two() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "hello");
+
+ let v = assert_ok!(rx2.recv().await);
+ assert_eq!(*v, "world");
+
+ match assert_err!(rx2.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ assert_ok!(tx.send(Arc::new("hello")));
+ assert_ok!(tx.send(Arc::new("world")));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn broadcast_wrap() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(2);
+ let mut rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx1.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ let mut num = 0;
+
+ loop {
+ match rx2.recv().await {
+ Ok(_) => num += 1,
+ Err(Closed) => break,
+ Err(Lagged(n)) => num += n as usize,
+ }
+ }
+
+ assert_eq!(num, 3);
+ });
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn drop_rx() {
+ loom::model(|| {
+ let (tx, mut rx1) = broadcast::channel(16);
+ let rx2 = tx.subscribe();
+
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "one");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "two");
+
+ let v = assert_ok!(rx1.recv().await);
+ assert_eq!(v, "three");
+
+ match assert_err!(rx1.recv().await) {
+ Closed => {}
+ _ => panic!(),
+ }
+ });
+ });
+
+ let th2 = thread::spawn(move || {
+ drop(rx2);
+ });
+
+ assert_ok!(tx.send("one"));
+ assert_ok!(tx.send("two"));
+ assert_ok!(tx.send("three"));
+ drop(tx);
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
+
+#[test]
+fn drop_multiple_rx_with_overflow() {
+ loom::model(move || {
+ // It is essential to have multiple senders and receivers in this test case.
+ let (tx, mut rx) = broadcast::channel(1);
+ let _rx2 = tx.subscribe();
+
+ let _ = tx.send(());
+ let tx2 = tx.clone();
+ let th1 = thread::spawn(move || {
+ block_on(async {
+ for _ in 0..100 {
+ let _ = tx2.send(());
+ }
+ });
+ });
+ let _ = tx.send(());
+
+ let th2 = thread::spawn(move || {
+ block_on(async { while let Ok(_) = rx.recv().await {} });
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_list.rs b/third_party/rust/tokio/src/sync/tests/loom_list.rs
new file mode 100644
index 0000000000..4067f865ce
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_list.rs
@@ -0,0 +1,48 @@
+use crate::sync::mpsc::list;
+
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn smoke() {
+ use crate::sync::mpsc::block::Read::*;
+
+ const NUM_TX: usize = 2;
+ const NUM_MSG: usize = 2;
+
+ loom::model(|| {
+ let (tx, mut rx) = list::channel();
+ let tx = Arc::new(tx);
+
+ for th in 0..NUM_TX {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ for i in 0..NUM_MSG {
+ tx.push((th, i));
+ }
+ });
+ }
+
+ let mut next = vec![0; NUM_TX];
+
+ loop {
+ match rx.pop(&tx) {
+ Some(Value((th, v))) => {
+ assert_eq!(v, next[th]);
+ next[th] += 1;
+
+ if next.iter().all(|&i| i == NUM_MSG) {
+ break;
+ }
+ }
+ Some(Closed) => {
+ panic!();
+ }
+ None => {
+ thread::yield_now();
+ }
+ }
+ }
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_mpsc.rs b/third_party/rust/tokio/src/sync/tests/loom_mpsc.rs
new file mode 100644
index 0000000000..f165e7076e
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_mpsc.rs
@@ -0,0 +1,190 @@
+use crate::sync::mpsc;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+use tokio_test::assert_ok;
+
+#[test]
+fn closing_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::channel(16);
+
+ thread::spawn(move || {
+ tx.try_send(()).unwrap();
+ drop(tx);
+ });
+
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn closing_unbounded_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ thread::spawn(move || {
+ tx.send(()).unwrap();
+ drop(tx);
+ });
+
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn closing_bounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::channel::<()>(16);
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn closing_and_sending() {
+ loom::model(|| {
+ let (tx1, mut rx) = mpsc::channel::<()>(16);
+ let tx1 = Arc::new(tx1);
+ let tx2 = tx1.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.try_send(()).unwrap();
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(tx2.closed());
+ });
+
+ let th3 = thread::spawn(move || {
+ let v = block_on(rx.recv());
+ assert!(v.is_some());
+ drop(rx);
+ });
+
+ assert_ok!(th1.join());
+ assert_ok!(th2.join());
+ assert_ok!(th3.join());
+ });
+}
+
+#[test]
+fn closing_unbounded_rx() {
+ loom::model(|| {
+ let (tx1, rx) = mpsc::unbounded_channel::<()>();
+ let tx2 = tx1.clone();
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ block_on(tx1.closed());
+ block_on(tx2.closed());
+ });
+}
+
+#[test]
+fn dropping_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::channel::<()>(16);
+
+ for _ in 0..2 {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ drop(tx);
+ });
+ }
+ drop(tx);
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn dropping_unbounded_tx() {
+ loom::model(|| {
+ let (tx, mut rx) = mpsc::unbounded_channel::<()>();
+
+ for _ in 0..2 {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ drop(tx);
+ });
+ }
+ drop(tx);
+
+ let v = block_on(rx.recv());
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn try_recv() {
+ loom::model(|| {
+ use crate::sync::{mpsc, Semaphore};
+ use loom::sync::{Arc, Mutex};
+
+ const PERMITS: usize = 2;
+ const TASKS: usize = 2;
+ const CYCLES: usize = 1;
+
+ struct Context {
+ sem: Arc<Semaphore>,
+ tx: mpsc::Sender<()>,
+ rx: Mutex<mpsc::Receiver<()>>,
+ }
+
+ fn run(ctx: &Context) {
+ block_on(async {
+ let permit = ctx.sem.acquire().await;
+ assert_ok!(ctx.rx.lock().unwrap().try_recv());
+ crate::task::yield_now().await;
+ assert_ok!(ctx.tx.clone().try_send(()));
+ drop(permit);
+ });
+ }
+
+ let (tx, rx) = mpsc::channel(PERMITS);
+ let sem = Arc::new(Semaphore::new(PERMITS));
+ let ctx = Arc::new(Context {
+ sem,
+ tx,
+ rx: Mutex::new(rx),
+ });
+
+ for _ in 0..PERMITS {
+ assert_ok!(ctx.tx.clone().try_send(()));
+ }
+
+ let mut ths = Vec::new();
+
+ for _ in 0..TASKS {
+ let ctx = ctx.clone();
+
+ ths.push(thread::spawn(move || {
+ run(&ctx);
+ }));
+ }
+
+ run(&ctx);
+
+ for th in ths {
+ th.join().unwrap();
+ }
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_notify.rs b/third_party/rust/tokio/src/sync/tests/loom_notify.rs
new file mode 100644
index 0000000000..d484a75817
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_notify.rs
@@ -0,0 +1,140 @@
+use crate::sync::Notify;
+
+use loom::future::block_on;
+use loom::sync::Arc;
+use loom::thread;
+
+#[test]
+fn notify_one() {
+ loom::model(|| {
+ let tx = Arc::new(Notify::new());
+ let rx = tx.clone();
+
+ let th = thread::spawn(move || {
+ block_on(async {
+ rx.notified().await;
+ });
+ });
+
+ tx.notify_one();
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_waiters() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let tx = notify.clone();
+ let notified1 = notify.notified();
+ let notified2 = notify.notified();
+
+ let th = thread::spawn(move || {
+ tx.notify_waiters();
+ });
+
+ block_on(async {
+ notified1.await;
+ notified2.await;
+ });
+
+ th.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_waiters_and_one() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let tx1 = notify.clone();
+ let tx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ tx1.notify_waiters();
+ });
+
+ let th2 = thread::spawn(move || {
+ tx2.notify_one();
+ });
+
+ let th3 = thread::spawn(move || {
+ let notified = notify.notified();
+
+ block_on(async {
+ notified.await;
+ });
+ });
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ th3.join().unwrap();
+ });
+}
+
+#[test]
+fn notify_multi() {
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let notify = notify.clone();
+
+ ths.push(thread::spawn(move || {
+ block_on(async {
+ notify.notified().await;
+ notify.notify_one();
+ })
+ }));
+ }
+
+ notify.notify_one();
+
+ for th in ths.drain(..) {
+ th.join().unwrap();
+ }
+
+ block_on(async {
+ notify.notified().await;
+ });
+ });
+}
+
+#[test]
+fn notify_drop() {
+ use crate::future::poll_fn;
+ use std::future::Future;
+ use std::task::Poll;
+
+ loom::model(|| {
+ let notify = Arc::new(Notify::new());
+ let rx1 = notify.clone();
+ let rx2 = notify.clone();
+
+ let th1 = thread::spawn(move || {
+ let mut recv = Box::pin(rx1.notified());
+
+ block_on(poll_fn(|cx| {
+ if recv.as_mut().poll(cx).is_ready() {
+ rx1.notify_one();
+ }
+ Poll::Ready(())
+ }));
+ });
+
+ let th2 = thread::spawn(move || {
+ block_on(async {
+ rx2.notified().await;
+ // Trigger second notification
+ rx2.notify_one();
+ rx2.notified().await;
+ });
+ });
+
+ notify.notify_one();
+
+ th1.join().unwrap();
+ th2.join().unwrap();
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_oneshot.rs b/third_party/rust/tokio/src/sync/tests/loom_oneshot.rs
new file mode 100644
index 0000000000..c5f7972079
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_oneshot.rs
@@ -0,0 +1,140 @@
+use crate::sync::oneshot;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+use std::task::Poll::{Pending, Ready};
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ });
+}
+
+#[test]
+fn changing_rx_task() {
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let rx = thread::spawn(move || {
+ let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
+ Ready(Ok(value)) => {
+ assert_eq!(1, value);
+ Ready(true)
+ }
+ Ready(Err(_)) => unimplemented!(),
+ Pending => Ready(false),
+ }));
+
+ if ready {
+ None
+ } else {
+ Some(rx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(rx) = rx {
+ // Previous task parked, use a new task...
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ }
+ });
+}
+
+#[test]
+fn try_recv_close() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+ thread::spawn(move || {
+ let _ = tx.send(());
+ });
+
+ rx.close();
+ let _ = rx.try_recv();
+ })
+}
+
+#[test]
+fn recv_closed() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let _ = tx.send(1);
+ });
+
+ rx.close();
+ let _ = block_on(rx);
+ });
+}
+
+// TODO: Move this into `oneshot` proper.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct OnClose<'a> {
+ tx: &'a mut oneshot::Sender<i32>,
+}
+
+impl<'a> OnClose<'a> {
+ fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
+ OnClose { tx }
+ }
+}
+
+impl Future for OnClose<'_> {
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let fut = self.get_mut().tx.closed();
+ crate::pin!(fut);
+
+ Ready(fut.poll(cx).is_ready())
+ }
+}
+
+#[test]
+fn changing_tx_task() {
+ loom::model(|| {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ let tx = thread::spawn(move || {
+ let t1 = block_on(OnClose::new(&mut tx));
+
+ if t1 {
+ None
+ } else {
+ Some(tx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(mut tx) = tx {
+ // Previous task parked, use a new task...
+ block_on(OnClose::new(&mut tx));
+ }
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_rwlock.rs b/third_party/rust/tokio/src/sync/tests/loom_rwlock.rs
new file mode 100644
index 0000000000..4b5cc7edc6
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_rwlock.rs
@@ -0,0 +1,105 @@
+use crate::sync::rwlock::*;
+
+use loom::future::block_on;
+use loom::thread;
+use std::sync::Arc;
+
+#[test]
+fn concurrent_write() {
+ let b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write_owned().await;
+ *guard += 5;
+ });
+ });
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ //when all threads have finished the value on the lock should be 10
+ let guard = block_on(rwlock.read());
+ assert_eq!(10, *guard);
+ });
+}
+
+#[test]
+fn concurrent_read_write() {
+ let b = loom::model::Builder::new();
+
+ b.check(|| {
+ let rwlock = Arc::new(RwLock::<u32>::new(0));
+
+ let rwclone = rwlock.clone();
+ let t1 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t2 = thread::spawn(move || {
+ block_on(async {
+ let mut guard = rwclone.write_owned().await;
+ *guard += 5;
+ });
+ });
+
+ let rwclone = rwlock.clone();
+ let t3 = thread::spawn(move || {
+ block_on(async {
+ let guard = rwclone.read().await;
+ //at this state the value on the lock may either be 0, 5, or 10
+ assert!(*guard == 0 || *guard == 5 || *guard == 10);
+ });
+ });
+
+ {
+ let guard = block_on(rwlock.clone().read_owned());
+ //at this state the value on the lock may either be 0, 5, or 10
+ assert!(*guard == 0 || *guard == 5 || *guard == 10);
+ }
+
+ t1.join().expect("thread 1 write should not panic");
+ t2.join().expect("thread 2 write should not panic");
+ t3.join().expect("thread 3 read should not panic");
+
+ let guard = block_on(rwlock.read());
+ //when all threads have finished the value on the lock should be 10
+ assert_eq!(10, *guard);
+ });
+}
+#[test]
+fn downgrade() {
+ loom::model(|| {
+ let lock = Arc::new(RwLock::new(1));
+
+ let n = block_on(lock.write());
+
+ let cloned_lock = lock.clone();
+ let handle = thread::spawn(move || {
+ let mut guard = block_on(cloned_lock.write());
+ *guard = 2;
+ });
+
+ let n = n.downgrade();
+ assert_eq!(*n, 1);
+
+ drop(n);
+ handle.join().unwrap();
+ assert_eq!(*block_on(lock.read()), 2);
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_semaphore_batch.rs b/third_party/rust/tokio/src/sync/tests/loom_semaphore_batch.rs
new file mode 100644
index 0000000000..76a1bc0062
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_semaphore_batch.rs
@@ -0,0 +1,215 @@
+use crate::sync::batch_semaphore::*;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::sync::atomic::AtomicUsize;
+use loom::thread;
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::atomic::Ordering::SeqCst;
+use std::sync::Arc;
+use std::task::Poll::Ready;
+use std::task::{Context, Poll};
+
+#[test]
+fn basic_usage() {
+ const NUM: usize = 2;
+
+ struct Shared {
+ semaphore: Semaphore,
+ active: AtomicUsize,
+ }
+
+ async fn actor(shared: Arc<Shared>) {
+ shared.semaphore.acquire(1).await.unwrap();
+ let actual = shared.active.fetch_add(1, SeqCst);
+ assert!(actual <= NUM - 1);
+
+ let actual = shared.active.fetch_sub(1, SeqCst);
+ assert!(actual <= NUM);
+ shared.semaphore.release(1);
+ }
+
+ loom::model(|| {
+ let shared = Arc::new(Shared {
+ semaphore: Semaphore::new(NUM),
+ active: AtomicUsize::new(0),
+ });
+
+ for _ in 0..NUM {
+ let shared = shared.clone();
+
+ thread::spawn(move || {
+ block_on(actor(shared));
+ });
+ }
+
+ block_on(actor(shared));
+ });
+}
+
+#[test]
+fn release() {
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || {
+ block_on(semaphore.acquire(1)).unwrap();
+ semaphore.release(1);
+ });
+ }
+
+ block_on(semaphore.acquire(1)).unwrap();
+
+ semaphore.release(1);
+ });
+}
+
+#[test]
+fn basic_closing() {
+ const NUM: usize = 2;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ for _ in 0..2 {
+ block_on(semaphore.acquire(1)).map_err(|_| ())?;
+
+ semaphore.release(1);
+ }
+
+ Ok::<(), ()>(())
+ });
+ }
+
+ semaphore.close();
+ });
+}
+
+#[test]
+fn concurrent_close() {
+ const NUM: usize = 3;
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(1));
+
+ for _ in 0..NUM {
+ let semaphore = semaphore.clone();
+
+ thread::spawn(move || {
+ block_on(semaphore.acquire(1)).map_err(|_| ())?;
+ semaphore.release(1);
+ semaphore.close();
+
+ Ok::<(), ()>(())
+ });
+ }
+ });
+}
+
+#[test]
+fn concurrent_cancel() {
+ async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
+ let mut acquire1 = Some(semaphore.acquire(1));
+ let mut acquire2 = Some(semaphore.acquire(1));
+ poll_fn(|cx| {
+ // poll the acquire future once, and then immediately throw
+ // it away. this simulates a situation where a future is
+ // polled and then cancelled, such as by a timeout.
+ if let Some(acquire) = acquire1.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ if let Some(acquire) = acquire2.take() {
+ pin!(acquire);
+ let _ = acquire.poll(cx);
+ }
+ Poll::Ready(())
+ })
+ .await
+ }
+
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(0));
+ let t1 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t2 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+ let t3 = {
+ let semaphore = semaphore.clone();
+ thread::spawn(move || block_on(poll_and_cancel(semaphore)))
+ };
+
+ t1.join().unwrap();
+ semaphore.release(10);
+ t2.join().unwrap();
+ t3.join().unwrap();
+ });
+}
+
+#[test]
+fn batch() {
+ let mut b = loom::model::Builder::new();
+ b.preemption_bound = Some(1);
+
+ b.check(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ let active = Arc::new(AtomicUsize::new(0));
+ let mut ths = vec![];
+
+ for _ in 0..2 {
+ let semaphore = semaphore.clone();
+ let active = active.clone();
+
+ ths.push(thread::spawn(move || {
+ for n in &[4, 10, 8] {
+ block_on(semaphore.acquire(*n)).unwrap();
+
+ active.fetch_add(*n as usize, SeqCst);
+
+ let num_active = active.load(SeqCst);
+ assert!(num_active <= 10);
+
+ thread::yield_now();
+
+ active.fetch_sub(*n as usize, SeqCst);
+
+ semaphore.release(*n as usize);
+ }
+ }));
+ }
+
+ for th in ths.into_iter() {
+ th.join().unwrap();
+ }
+
+ assert_eq!(10, semaphore.available_permits());
+ });
+}
+
+#[test]
+fn release_during_acquire() {
+ loom::model(|| {
+ let semaphore = Arc::new(Semaphore::new(10));
+ semaphore
+ .try_acquire(8)
+ .expect("try_acquire should succeed; semaphore uncontended");
+ let semaphore2 = semaphore.clone();
+ let thread = thread::spawn(move || block_on(semaphore2.acquire(4)).unwrap());
+
+ semaphore.release(8);
+ thread.join().unwrap();
+ semaphore.release(4);
+ assert_eq!(10, semaphore.available_permits());
+ })
+}
diff --git a/third_party/rust/tokio/src/sync/tests/loom_watch.rs b/third_party/rust/tokio/src/sync/tests/loom_watch.rs
new file mode 100644
index 0000000000..c575b5b66c
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/loom_watch.rs
@@ -0,0 +1,36 @@
+use crate::sync::watch;
+
+use loom::future::block_on;
+use loom::thread;
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, mut rx1) = watch::channel(1);
+ let mut rx2 = rx1.clone();
+ let mut rx3 = rx1.clone();
+ let mut rx4 = rx1.clone();
+ let mut rx5 = rx1.clone();
+
+ let th = thread::spawn(move || {
+ tx.send(2).unwrap();
+ });
+
+ block_on(rx1.changed()).unwrap();
+ assert_eq!(*rx1.borrow(), 2);
+
+ block_on(rx2.changed()).unwrap();
+ assert_eq!(*rx2.borrow(), 2);
+
+ block_on(rx3.changed()).unwrap();
+ assert_eq!(*rx3.borrow(), 2);
+
+ block_on(rx4.changed()).unwrap();
+ assert_eq!(*rx4.borrow(), 2);
+
+ block_on(rx5.changed()).unwrap();
+ assert_eq!(*rx5.borrow(), 2);
+
+ th.join().unwrap();
+ })
+}
diff --git a/third_party/rust/tokio/src/sync/tests/mod.rs b/third_party/rust/tokio/src/sync/tests/mod.rs
new file mode 100644
index 0000000000..ee76418ac5
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/mod.rs
@@ -0,0 +1,17 @@
+cfg_not_loom! {
+ mod atomic_waker;
+ mod notify;
+ mod semaphore_batch;
+}
+
+cfg_loom! {
+ mod loom_atomic_waker;
+ mod loom_broadcast;
+ mod loom_list;
+ mod loom_mpsc;
+ mod loom_notify;
+ mod loom_oneshot;
+ mod loom_semaphore_batch;
+ mod loom_watch;
+ mod loom_rwlock;
+}
diff --git a/third_party/rust/tokio/src/sync/tests/notify.rs b/third_party/rust/tokio/src/sync/tests/notify.rs
new file mode 100644
index 0000000000..20153b7a5a
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/notify.rs
@@ -0,0 +1,81 @@
+use crate::sync::Notify;
+use std::future::Future;
+use std::mem::ManuallyDrop;
+use std::sync::Arc;
+use std::task::{Context, RawWaker, RawWakerVTable, Waker};
+
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+
+#[test]
+fn notify_clones_waker_before_lock() {
+ const VTABLE: &RawWakerVTable = &RawWakerVTable::new(clone_w, wake, wake_by_ref, drop_w);
+
+ unsafe fn clone_w(data: *const ()) -> RawWaker {
+ let arc = ManuallyDrop::new(Arc::<Notify>::from_raw(data as *const Notify));
+ // Or some other arbitrary code that shouldn't be executed while the
+ // Notify wait list is locked.
+ arc.notify_one();
+ let _arc_clone: ManuallyDrop<_> = arc.clone();
+ RawWaker::new(data, VTABLE)
+ }
+
+ unsafe fn drop_w(data: *const ()) {
+ let _ = Arc::<Notify>::from_raw(data as *const Notify);
+ }
+
+ unsafe fn wake(_data: *const ()) {
+ unreachable!()
+ }
+
+ unsafe fn wake_by_ref(_data: *const ()) {
+ unreachable!()
+ }
+
+ let notify = Arc::new(Notify::new());
+ let notify2 = notify.clone();
+
+ let waker =
+ unsafe { Waker::from_raw(RawWaker::new(Arc::into_raw(notify2) as *const _, VTABLE)) };
+ let mut cx = Context::from_waker(&waker);
+
+ let future = notify.notified();
+ pin!(future);
+
+ // The result doesn't matter, we're just testing that we don't deadlock.
+ let _ = future.poll(&mut cx);
+}
+
+#[test]
+fn notify_simple() {
+ let notify = Notify::new();
+
+ let mut fut1 = tokio_test::task::spawn(notify.notified());
+ assert!(fut1.poll().is_pending());
+
+ let mut fut2 = tokio_test::task::spawn(notify.notified());
+ assert!(fut2.poll().is_pending());
+
+ notify.notify_waiters();
+
+ assert!(fut1.poll().is_ready());
+ assert!(fut2.poll().is_ready());
+}
+
+#[test]
+#[cfg(not(target_arch = "wasm32"))]
+fn watch_test() {
+ let rt = crate::runtime::Builder::new_current_thread()
+ .build()
+ .unwrap();
+
+ rt.block_on(async {
+ let (tx, mut rx) = crate::sync::watch::channel(());
+
+ crate::spawn(async move {
+ let _ = tx.send(());
+ });
+
+ let _ = rx.changed().await;
+ });
+}
diff --git a/third_party/rust/tokio/src/sync/tests/semaphore_batch.rs b/third_party/rust/tokio/src/sync/tests/semaphore_batch.rs
new file mode 100644
index 0000000000..d529a9e886
--- /dev/null
+++ b/third_party/rust/tokio/src/sync/tests/semaphore_batch.rs
@@ -0,0 +1,254 @@
+use crate::sync::batch_semaphore::Semaphore;
+use tokio_test::*;
+
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+
+#[test]
+fn poll_acquire_one_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 99);
+}
+
+#[test]
+fn poll_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ // Polling for a permit succeeds immediately
+ assert_ready_ok!(task::spawn(s.acquire(5)).poll());
+ assert_eq!(s.available_permits(), 95);
+
+ assert_ready_ok!(task::spawn(s.acquire(5)).poll());
+ assert_eq!(s.available_permits(), 90);
+}
+
+#[test]
+fn try_acquire_one_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 99);
+
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 98);
+}
+
+#[test]
+fn try_acquire_many_available() {
+ let s = Semaphore::new(100);
+ assert_eq!(s.available_permits(), 100);
+
+ assert_ok!(s.try_acquire(5));
+ assert_eq!(s.available_permits(), 95);
+
+ assert_ok!(s.try_acquire(5));
+ assert_eq!(s.available_permits(), 90);
+}
+
+#[test]
+fn poll_acquire_one_unavailable() {
+ let s = Semaphore::new(1);
+
+ // Acquire the first permit
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 0);
+
+ let mut acquire_2 = task::spawn(s.acquire(1));
+ // Try to acquire the second permit
+ assert_pending!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(acquire_2.is_woken());
+ assert_ready_ok!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn poll_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ // Acquire the first permit
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(s.available_permits(), 4);
+
+ // Try to acquire the second permit
+ let mut acquire_2 = task::spawn(s.acquire(5));
+ assert_pending!(acquire_2.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ // Try to acquire the third permit
+ let mut acquire_3 = task::spawn(s.acquire(3));
+ assert_pending!(acquire_3.poll());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 0);
+ assert!(acquire_2.is_woken());
+ assert_ready_ok!(acquire_2.poll());
+
+ assert!(!acquire_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(1);
+ assert!(!acquire_3.is_woken());
+ assert_eq!(s.available_permits(), 0);
+
+ s.release(2);
+ assert!(acquire_3.is_woken());
+
+ assert_ready_ok!(acquire_3.poll());
+}
+
+#[test]
+fn try_acquire_one_unavailable() {
+ let s = Semaphore::new(1);
+
+ // Acquire the first permit
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 0);
+
+ assert_err!(s.try_acquire(1));
+
+ s.release(1);
+
+ assert_eq!(s.available_permits(), 1);
+ assert_ok!(s.try_acquire(1));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+}
+
+#[test]
+fn try_acquire_many_unavailable() {
+ let s = Semaphore::new(5);
+
+ // Acquire the first permit
+ assert_ok!(s.try_acquire(1));
+ assert_eq!(s.available_permits(), 4);
+
+ assert_err!(s.try_acquire(5));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 5);
+
+ assert_ok!(s.try_acquire(5));
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 1);
+
+ s.release(1);
+ assert_eq!(s.available_permits(), 2);
+}
+
+#[test]
+fn poll_acquire_one_zero_permits() {
+ let s = Semaphore::new(0);
+ assert_eq!(s.available_permits(), 0);
+
+ // Try to acquire the permit
+ let mut acquire = task::spawn(s.acquire(1));
+ assert_pending!(acquire.poll());
+
+ s.release(1);
+
+ assert!(acquire.is_woken());
+ assert_ready_ok!(acquire.poll());
+}
+
+#[test]
+#[should_panic]
+#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding
+fn validates_max_permits() {
+ use std::usize;
+ Semaphore::new((usize::MAX >> 2) + 1);
+}
+
+#[test]
+fn close_semaphore_prevents_acquire() {
+ let s = Semaphore::new(5);
+ s.close();
+
+ assert_eq!(5, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(5, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+ assert_eq!(5, s.available_permits());
+}
+
+#[test]
+fn close_semaphore_notifies_permit1() {
+ let s = Semaphore::new(0);
+ let mut acquire = task::spawn(s.acquire(1));
+
+ assert_pending!(acquire.poll());
+
+ s.close();
+
+ assert!(acquire.is_woken());
+ assert_ready_err!(acquire.poll());
+}
+
+#[test]
+fn close_semaphore_notifies_permit2() {
+ let s = Semaphore::new(2);
+
+ // Acquire a couple of permits
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+ assert_ready_ok!(task::spawn(s.acquire(1)).poll());
+
+ let mut acquire3 = task::spawn(s.acquire(1));
+ let mut acquire4 = task::spawn(s.acquire(1));
+ assert_pending!(acquire3.poll());
+ assert_pending!(acquire4.poll());
+
+ s.close();
+
+ assert!(acquire3.is_woken());
+ assert!(acquire4.is_woken());
+
+ assert_ready_err!(acquire3.poll());
+ assert_ready_err!(acquire4.poll());
+
+ assert_eq!(0, s.available_permits());
+
+ s.release(1);
+
+ assert_eq!(1, s.available_permits());
+
+ assert_ready_err!(task::spawn(s.acquire(1)).poll());
+
+ s.release(1);
+
+ assert_eq!(2, s.available_permits());
+}
+
+#[test]
+fn cancel_acquire_releases_permits() {
+ let s = Semaphore::new(10);
+ s.try_acquire(4).expect("uncontended try_acquire succeeds");
+ assert_eq!(6, s.available_permits());
+
+ let mut acquire = task::spawn(s.acquire(8));
+ assert_pending!(acquire.poll());
+
+ assert_eq!(0, s.available_permits());
+ drop(acquire);
+
+ assert_eq!(6, s.available_permits());
+ assert_ok!(s.try_acquire(6));
+}