diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:57:31 +0000 |
commit | dc0db358abe19481e475e10c32149b53370f1a1c (patch) | |
tree | ab8ce99c4b255ce46f99ef402c27916055b899ee /vendor/tokio/tests/sync_broadcast.rs | |
parent | Releasing progress-linux version 1.71.1+dfsg1-2~progress7.99u1. (diff) | |
download | rustc-dc0db358abe19481e475e10c32149b53370f1a1c.tar.xz rustc-dc0db358abe19481e475e10c32149b53370f1a1c.zip |
Merging upstream version 1.72.1+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/tokio/tests/sync_broadcast.rs')
-rw-r--r-- | vendor/tokio/tests/sync_broadcast.rs | 189 |
1 files changed, 188 insertions, 1 deletions
diff --git a/vendor/tokio/tests/sync_broadcast.rs b/vendor/tokio/tests/sync_broadcast.rs index 5f79800a7..feed03148 100644 --- a/vendor/tokio/tests/sync_broadcast.rs +++ b/vendor/tokio/tests/sync_broadcast.rs @@ -2,6 +2,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "sync")] +#[cfg(tokio_wasm_not_wasi)] +use wasm_bindgen_test::wasm_bindgen_test as test; + use tokio::sync::broadcast; use tokio_test::task; use tokio_test::{ @@ -44,7 +47,7 @@ macro_rules! assert_closed { ($e:expr) => { match assert_err!($e) { broadcast::error::TryRecvError::Closed => {} - _ => panic!("did not lag"), + _ => panic!("is not closed"), } }; } @@ -273,12 +276,14 @@ fn send_no_rx() { #[test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn zero_capacity() { broadcast::channel::<()>(0); } #[test] #[should_panic] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn capacity_too_big() { use std::usize; @@ -286,6 +291,8 @@ fn capacity_too_big() { } #[test] +#[cfg(panic = "unwind")] +#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding fn panic_in_clone() { use std::panic::{self, AssertUnwindSafe}; @@ -451,6 +458,186 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[test] +fn receiver_len_with_lagged() { + let (tx, mut rx) = broadcast::channel(3); + + tx.send(10).unwrap(); + tx.send(20).unwrap(); + tx.send(30).unwrap(); + tx.send(40).unwrap(); + + assert_eq!(rx.len(), 4); + assert_eq!(assert_recv!(rx), 10); + + tx.send(50).unwrap(); + tx.send(60).unwrap(); + + assert_eq!(rx.len(), 5); + assert_lagged!(rx.try_recv(), 1); +} + fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) } + +#[test] +fn resubscribe_points_to_tail() { + let (tx, mut rx) = broadcast::channel(3); + tx.send(1).unwrap(); + + let mut rx_resub = rx.resubscribe(); + + // verify we're one behind at the start + assert_empty!(rx_resub); + assert_eq!(assert_recv!(rx), 1); + + // verify we do not affect rx + tx.send(2).unwrap(); + assert_eq!(assert_recv!(rx_resub), 2); + tx.send(3).unwrap(); + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + assert_empty!(rx); + + assert_eq!(assert_recv!(rx_resub), 3); + assert_empty!(rx_resub); +} + +#[test] +fn resubscribe_lagged() { + let (tx, mut rx) = broadcast::channel(1); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + let mut rx_resub = rx.resubscribe(); + assert_lagged!(rx.try_recv(), 1); + assert_empty!(rx_resub); + + assert_eq!(assert_recv!(rx), 2); + assert_empty!(rx); + assert_empty!(rx_resub); +} + +#[test] +fn resubscribe_to_closed_channel() { + let (tx, rx) = tokio::sync::broadcast::channel::<u32>(2); + drop(tx); + + let mut rx_resub = rx.resubscribe(); + assert_closed!(rx_resub.try_recv()); +} + +#[test] +fn sender_len() { + let (tx, mut rx1) = broadcast::channel(4); + let mut rx2 = tx.subscribe(); + + assert_eq!(tx.len(), 0); + assert!(tx.is_empty()); + + tx.send(1).unwrap(); + tx.send(2).unwrap(); + tx.send(3).unwrap(); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx1); + assert_recv!(rx1); + + assert_eq!(tx.len(), 3); + assert!(!tx.is_empty()); + + assert_recv!(rx2); + + assert_eq!(tx.len(), 2); + assert!(!tx.is_empty()); + + tx.send(4).unwrap(); + tx.send(5).unwrap(); + tx.send(6).unwrap(); + + assert_eq!(tx.len(), 4); + assert!(!tx.is_empty()); +} + +#[test] +#[cfg(not(tokio_wasm_not_wasi))] +fn sender_len_random() { + use rand::Rng; + + let (tx, mut rx1) = broadcast::channel(16); + let mut rx2 = tx.subscribe(); + + for _ in 0..1000 { + match rand::thread_rng().gen_range(0..4) { + 0 => { + let _ = rx1.try_recv(); + } + 1 => { + let _ = rx2.try_recv(); + } + _ => { + tx.send(0).unwrap(); + } + } + + let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16); + assert_eq!(tx.len(), expected_len); + } +} + +#[test] +fn send_in_waker_drop() { + use futures::task::ArcWake; + use std::future::Future; + use std::task::Context; + + struct SendOnDrop(broadcast::Sender<()>); + + impl Drop for SendOnDrop { + fn drop(&mut self) { + let _ = self.0.send(()); + } + } + + impl ArcWake for SendOnDrop { + fn wake_by_ref(_arc_self: &Arc<Self>) {} + } + + // Test if there is no deadlock when replacing the old waker. + + let (tx, mut rx) = broadcast::channel(16); + + let mut fut = Box::pin(async { + let _ = rx.recv().await; + }); + + // Store our special waker in the receiving future. + let waker = futures::task::waker(Arc::new(SendOnDrop(tx))); + let mut cx = Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + drop(waker); + + // Second poll shouldn't deadlock. + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + let _ = fut.as_mut().poll(&mut cx); + + // Test if there is no deadlock when calling waker.wake(). + + let (tx, mut rx) = broadcast::channel(16); + + let mut fut = Box::pin(async { + let _ = rx.recv().await; + }); + + // Store our special waker in the receiving future. + let waker = futures::task::waker(Arc::new(SendOnDrop(tx.clone()))); + let mut cx = Context::from_waker(&waker); + assert!(fut.as_mut().poll(&mut cx).is_pending()); + drop(waker); + + // Shouldn't deadlock. + let _ = tx.send(()); +} |