summaryrefslogtreecommitdiffstats
path: root/vendor/crossbeam-channel/tests/after.rs
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:02:58 +0000
commit698f8c2f01ea549d77d7dc3338a12e04c11057b9 (patch)
tree173a775858bd501c378080a10dca74132f05bc50 /vendor/crossbeam-channel/tests/after.rs
parentInitial commit. (diff)
downloadrustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.tar.xz
rustc-698f8c2f01ea549d77d7dc3338a12e04c11057b9.zip
Adding upstream version 1.64.0+dfsg1.upstream/1.64.0+dfsg1
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/crossbeam-channel/tests/after.rs')
-rw-r--r--vendor/crossbeam-channel/tests/after.rs336
1 files changed, 336 insertions, 0 deletions
diff --git a/vendor/crossbeam-channel/tests/after.rs b/vendor/crossbeam-channel/tests/after.rs
new file mode 100644
index 000000000..678a8c679
--- /dev/null
+++ b/vendor/crossbeam-channel/tests/after.rs
@@ -0,0 +1,336 @@
+//! Tests for the after channel flavor.
+
+#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow
+
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::thread;
+use std::time::{Duration, Instant};
+
+use crossbeam_channel::{after, select, Select, TryRecvError};
+use crossbeam_utils::thread::scope;
+
+fn ms(ms: u64) -> Duration {
+ Duration::from_millis(ms)
+}
+
+#[test]
+fn fire() {
+ let start = Instant::now();
+ let r = after(ms(50));
+
+ assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
+ thread::sleep(ms(100));
+
+ let fired = r.try_recv().unwrap();
+ assert!(start < fired);
+ assert!(fired - start >= ms(50));
+
+ let now = Instant::now();
+ assert!(fired < now);
+ assert!(now - fired >= ms(50));
+
+ assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
+
+ select! {
+ recv(r) -> _ => panic!(),
+ default => {}
+ }
+
+ select! {
+ recv(r) -> _ => panic!(),
+ recv(after(ms(200))) -> _ => {}
+ }
+}
+
+#[test]
+fn capacity() {
+ const COUNT: usize = 10;
+
+ for i in 0..COUNT {
+ let r = after(ms(i as u64));
+ assert_eq!(r.capacity(), Some(1));
+ }
+}
+
+#[test]
+fn len_empty_full() {
+ let r = after(ms(50));
+
+ assert_eq!(r.len(), 0);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
+
+ thread::sleep(ms(100));
+
+ assert_eq!(r.len(), 1);
+ assert!(!r.is_empty());
+ assert!(r.is_full());
+
+ r.try_recv().unwrap();
+
+ assert_eq!(r.len(), 0);
+ assert!(r.is_empty());
+ assert!(!r.is_full());
+}
+
+#[test]
+fn try_recv() {
+ let r = after(ms(200));
+ assert!(r.try_recv().is_err());
+
+ thread::sleep(ms(100));
+ assert!(r.try_recv().is_err());
+
+ thread::sleep(ms(200));
+ assert!(r.try_recv().is_ok());
+ assert!(r.try_recv().is_err());
+
+ thread::sleep(ms(200));
+ assert!(r.try_recv().is_err());
+}
+
+#[test]
+fn recv() {
+ let start = Instant::now();
+ let r = after(ms(50));
+
+ let fired = r.recv().unwrap();
+ assert!(start < fired);
+ assert!(fired - start >= ms(50));
+
+ let now = Instant::now();
+ assert!(fired < now);
+ assert!(now - fired < fired - start);
+
+ assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
+}
+
+#[test]
+fn recv_timeout() {
+ let start = Instant::now();
+ let r = after(ms(200));
+
+ assert!(r.recv_timeout(ms(100)).is_err());
+ let now = Instant::now();
+ assert!(now - start >= ms(100));
+ assert!(now - start <= ms(150));
+
+ let fired = r.recv_timeout(ms(200)).unwrap();
+ assert!(fired - start >= ms(200));
+ assert!(fired - start <= ms(250));
+
+ assert!(r.recv_timeout(ms(200)).is_err());
+ let now = Instant::now();
+ assert!(now - start >= ms(400));
+ assert!(now - start <= ms(450));
+
+ assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
+}
+
+#[test]
+fn recv_two() {
+ let r1 = after(ms(50));
+ let r2 = after(ms(50));
+
+ scope(|scope| {
+ scope.spawn(|_| {
+ select! {
+ recv(r1) -> _ => {}
+ recv(r2) -> _ => {}
+ }
+ });
+ scope.spawn(|_| {
+ select! {
+ recv(r1) -> _ => {}
+ recv(r2) -> _ => {}
+ }
+ });
+ })
+ .unwrap();
+}
+
+#[test]
+fn recv_race() {
+ select! {
+ recv(after(ms(50))) -> _ => {}
+ recv(after(ms(100))) -> _ => panic!(),
+ }
+
+ select! {
+ recv(after(ms(100))) -> _ => panic!(),
+ recv(after(ms(50))) -> _ => {}
+ }
+}
+
+#[test]
+fn stress_default() {
+ const COUNT: usize = 10;
+
+ for _ in 0..COUNT {
+ select! {
+ recv(after(ms(0))) -> _ => {}
+ default => panic!(),
+ }
+ }
+
+ for _ in 0..COUNT {
+ select! {
+ recv(after(ms(100))) -> _ => panic!(),
+ default => {}
+ }
+ }
+}
+
+#[test]
+fn select() {
+ const THREADS: usize = 4;
+ const COUNT: usize = 1000;
+ const TIMEOUT_MS: u64 = 100;
+
+ let v = (0..COUNT)
+ .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
+ .collect::<Vec<_>>();
+ let hits = AtomicUsize::new(0);
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ scope.spawn(|_| {
+ let v: Vec<&_> = v.iter().collect();
+
+ loop {
+ let timeout = after(ms(TIMEOUT_MS));
+ let mut sel = Select::new();
+ for r in &v {
+ sel.recv(r);
+ }
+ let oper_timeout = sel.recv(&timeout);
+
+ let oper = sel.select();
+ match oper.index() {
+ i if i == oper_timeout => {
+ oper.recv(&timeout).unwrap();
+ break;
+ }
+ i => {
+ oper.recv(v[i]).unwrap();
+ hits.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+ }
+ });
+ }
+ })
+ .unwrap();
+
+ assert_eq!(hits.load(Ordering::SeqCst), COUNT);
+}
+
+#[test]
+fn ready() {
+ const THREADS: usize = 4;
+ const COUNT: usize = 1000;
+ const TIMEOUT_MS: u64 = 100;
+
+ let v = (0..COUNT)
+ .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
+ .collect::<Vec<_>>();
+ let hits = AtomicUsize::new(0);
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ scope.spawn(|_| {
+ let v: Vec<&_> = v.iter().collect();
+
+ loop {
+ let timeout = after(ms(TIMEOUT_MS));
+ let mut sel = Select::new();
+ for r in &v {
+ sel.recv(r);
+ }
+ let oper_timeout = sel.recv(&timeout);
+
+ loop {
+ let i = sel.ready();
+ if i == oper_timeout {
+ timeout.try_recv().unwrap();
+ return;
+ } else if v[i].try_recv().is_ok() {
+ hits.fetch_add(1, Ordering::SeqCst);
+ break;
+ }
+ }
+ }
+ });
+ }
+ })
+ .unwrap();
+
+ assert_eq!(hits.load(Ordering::SeqCst), COUNT);
+}
+
+#[test]
+fn stress_clone() {
+ const RUNS: usize = 1000;
+ const THREADS: usize = 10;
+ const COUNT: usize = 50;
+
+ for i in 0..RUNS {
+ let r = after(ms(i as u64));
+
+ scope(|scope| {
+ for _ in 0..THREADS {
+ scope.spawn(|_| {
+ let r = r.clone();
+ let _ = r.try_recv();
+
+ for _ in 0..COUNT {
+ drop(r.clone());
+ thread::yield_now();
+ }
+ });
+ }
+ })
+ .unwrap();
+ }
+}
+
+#[test]
+fn fairness() {
+ const COUNT: usize = 1000;
+
+ for &dur in &[0, 1] {
+ let mut hits = [0usize; 2];
+
+ for _ in 0..COUNT {
+ select! {
+ recv(after(ms(dur))) -> _ => hits[0] += 1,
+ recv(after(ms(dur))) -> _ => hits[1] += 1,
+ }
+ }
+
+ assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
+ }
+}
+
+#[test]
+fn fairness_duplicates() {
+ const COUNT: usize = 1000;
+
+ for &dur in &[0, 1] {
+ let mut hits = [0usize; 5];
+
+ for _ in 0..COUNT {
+ let r = after(ms(dur));
+ select! {
+ recv(r) -> _ => hits[0] += 1,
+ recv(r) -> _ => hits[1] += 1,
+ recv(r) -> _ => hits[2] += 1,
+ recv(r) -> _ => hits[3] += 1,
+ recv(r) -> _ => hits[4] += 1,
+ }
+ }
+
+ assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
+ }
+}