summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/tests/sync_broadcast.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:57:31 +0000
commitdc0db358abe19481e475e10c32149b53370f1a1c (patch)
treeab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/tests/sync_broadcast.rs
parentReleasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff)
downloadrustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz
rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/tests/sync_broadcast.rs')
-rw-r--r--vendor/tokio/tests/sync_broadcast.rs189
1 files changed, 188 insertions, 1 deletions
diff --git a/vendor/tokio/tests/sync_broadcast.rs b/vendor/tokio/tests/sync_broadcast.rs
index 5f79800a7..feed03148 100644
--- a/vendor/tokio/tests/sync_broadcast.rs
+++ b/vendor/tokio/tests/sync_broadcast.rs
@@ -2,6 +2,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "sync")]
+#[cfg(tokio_wasm_not_wasi)]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+
use tokio::sync::broadcast;
use tokio_test::task;
use tokio_test::{
@@ -44,7 +47,7 @@ macro_rules! assert_closed {
($e:expr) => {
match assert_err!($e) {
broadcast::error::TryRecvError::Closed => {}
- _ => panic!("did not lag"),
+ _ => panic!("is not closed"),
}
};
}
@@ -273,12 +276,14 @@ fn send_no_rx() {
#[test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn zero_capacity() {
broadcast::channel::<()>(0);
}
#[test]
#[should_panic]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn capacity_too_big() {
use std::usize;
@@ -286,6 +291,8 @@ fn capacity_too_big() {
}
#[test]
+#[cfg(panic = "unwind")]
+#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn panic_in_clone() {
use std::panic::{self, AssertUnwindSafe};
@@ -451,6 +458,186 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}
+#[test]
+fn receiver_len_with_lagged() {
+ let (tx, mut rx) = broadcast::channel(3);
+
+ tx.send(10).unwrap();
+ tx.send(20).unwrap();
+ tx.send(30).unwrap();
+ tx.send(40).unwrap();
+
+ assert_eq!(rx.len(), 4);
+ assert_eq!(assert_recv!(rx), 10);
+
+ tx.send(50).unwrap();
+ tx.send(60).unwrap();
+
+ assert_eq!(rx.len(), 5);
+ assert_lagged!(rx.try_recv(), 1);
+}
+
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}
+
+#[test]
+fn resubscribe_points_to_tail() {
+ let (tx, mut rx) = broadcast::channel(3);
+ tx.send(1).unwrap();
+
+ let mut rx_resub = rx.resubscribe();
+
+ // verify we're one behind at the start
+ assert_empty!(rx_resub);
+ assert_eq!(assert_recv!(rx), 1);
+
+ // verify we do not affect rx
+ tx.send(2).unwrap();
+ assert_eq!(assert_recv!(rx_resub), 2);
+ tx.send(3).unwrap();
+ assert_eq!(assert_recv!(rx), 2);
+ assert_eq!(assert_recv!(rx), 3);
+ assert_empty!(rx);
+
+ assert_eq!(assert_recv!(rx_resub), 3);
+ assert_empty!(rx_resub);
+}
+
+#[test]
+fn resubscribe_lagged() {
+ let (tx, mut rx) = broadcast::channel(1);
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+
+ let mut rx_resub = rx.resubscribe();
+ assert_lagged!(rx.try_recv(), 1);
+ assert_empty!(rx_resub);
+
+ assert_eq!(assert_recv!(rx), 2);
+ assert_empty!(rx);
+ assert_empty!(rx_resub);
+}
+
+#[test]
+fn resubscribe_to_closed_channel() {
+ let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2);
+ drop(tx);
+
+ let mut rx_resub = rx.resubscribe();
+ assert_closed!(rx_resub.try_recv());
+}
+
+#[test]
+fn sender_len() {
+ let (tx, mut rx1) = broadcast::channel(4);
+ let mut rx2 = tx.subscribe();
+
+ assert_eq!(tx.len(), 0);
+ assert!(tx.is_empty());
+
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ tx.send(3).unwrap();
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx1);
+ assert_recv!(rx1);
+
+ assert_eq!(tx.len(), 3);
+ assert!(!tx.is_empty());
+
+ assert_recv!(rx2);
+
+ assert_eq!(tx.len(), 2);
+ assert!(!tx.is_empty());
+
+ tx.send(4).unwrap();
+ tx.send(5).unwrap();
+ tx.send(6).unwrap();
+
+ assert_eq!(tx.len(), 4);
+ assert!(!tx.is_empty());
+}
+
+#[test]
+#[cfg(not(tokio_wasm_not_wasi))]
+fn sender_len_random() {
+ use rand::Rng;
+
+ let (tx, mut rx1) = broadcast::channel(16);
+ let mut rx2 = tx.subscribe();
+
+ for _ in 0..1000 {
+ match rand::thread_rng().gen_range(0..4) {
+ 0 => {
+ let _ = rx1.try_recv();
+ }
+ 1 => {
+ let _ = rx2.try_recv();
+ }
+ _ => {
+ tx.send(0).unwrap();
+ }
+ }
+
+ let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
+ assert_eq!(tx.len(), expected_len);
+ }
+}
+
+#[test]
+fn send_in_waker_drop() {
+ use futures::task::ArcWake;
+ use std::future::Future;
+ use std::task::Context;
+
+ struct SendOnDrop(broadcast::Sender<()>);
+
+ impl Drop for SendOnDrop {
+ fn drop(&mut self) {
+ let _ = self.0.send(());
+ }
+ }
+
+ impl ArcWake for SendOnDrop {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {}
+ }
+
+ // Test if there is no deadlock when replacing the old waker.
+
+ let (tx, mut rx) = broadcast::channel(16);
+
+ let mut fut = Box::pin(async {
+ let _ = rx.recv().await;
+ });
+
+ // Store our special waker in the receiving future.
+ let waker = futures::task::waker(Arc::new(SendOnDrop(tx)));
+ let mut cx = Context::from_waker(&waker);
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+ drop(waker);
+
+ // Second poll shouldn't deadlock.
+ let mut cx = Context::from_waker(futures::task::noop_waker_ref());
+ let _ = fut.as_mut().poll(&mut cx);
+
+ // Test if there is no deadlock when calling waker.wake().
+
+ let (tx, mut rx) = broadcast::channel(16);
+
+ let mut fut = Box::pin(async {
+ let _ = rx.recv().await;
+ });
+
+ // Store our special waker in the receiving future.
+ let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone())));
+ let mut cx = Context::from_waker(&waker);
+ assert!(fut.as_mut().poll(&mut cx).is_pending());
+ drop(waker);
+
+ // Shouldn't deadlock.
+ let _ = tx.send(());
+}