//! Tests for the after channel flavor. use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::thread; use std::time::{Duration, Instant}; use crossbeam_channel::{after, select, Select, TryRecvError}; use crossbeam_utils::thread::scope; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) } #[test] fn fire() { let start = Instant::now(); let r = after(ms(50)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); thread::sleep(ms(100)); let fired = r.try_recv().unwrap(); assert!(start < fired); assert!(fired - start >= ms(50)); let now = Instant::now(); assert!(fired < now); assert!(now - fired >= ms(50)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); select! { recv(r) -> _ => panic!(), default => {} } select! { recv(r) -> _ => panic!(), recv(after(ms(200))) -> _ => {} } } #[test] fn capacity() { const COUNT: usize = 10; for i in 0..COUNT { let r = after(ms(i as u64)); assert_eq!(r.capacity(), Some(1)); } } #[test] fn len_empty_full() { let r = after(ms(50)); assert_eq!(r.len(), 0); assert_eq!(r.is_empty(), true); assert_eq!(r.is_full(), false); thread::sleep(ms(100)); assert_eq!(r.len(), 1); assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), true); r.try_recv().unwrap(); assert_eq!(r.len(), 0); assert_eq!(r.is_empty(), true); assert_eq!(r.is_full(), false); } #[test] fn try_recv() { let r = after(ms(200)); assert!(r.try_recv().is_err()); thread::sleep(ms(100)); assert!(r.try_recv().is_err()); thread::sleep(ms(200)); assert!(r.try_recv().is_ok()); assert!(r.try_recv().is_err()); thread::sleep(ms(200)); assert!(r.try_recv().is_err()); } #[test] fn recv() { let start = Instant::now(); let r = after(ms(50)); let fired = r.recv().unwrap(); assert!(start < fired); assert!(fired - start >= ms(50)); let now = Instant::now(); assert!(fired < now); assert!(now - fired < fired - start); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } #[test] fn recv_timeout() { let start = Instant::now(); let r = after(ms(200)); assert!(r.recv_timeout(ms(100)).is_err()); let now = Instant::now(); assert!(now - start >= ms(100)); assert!(now - start <= ms(150)); let fired = r.recv_timeout(ms(200)).unwrap(); assert!(fired - start >= ms(200)); assert!(fired - start <= ms(250)); assert!(r.recv_timeout(ms(200)).is_err()); let now = Instant::now(); assert!(now - start >= ms(400)); assert!(now - start <= ms(450)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } #[test] fn recv_two() { let r1 = after(ms(50)); let r2 = after(ms(50)); scope(|scope| { scope.spawn(|_| { select! { recv(r1) -> _ => {} recv(r2) -> _ => {} } }); scope.spawn(|_| { select! { recv(r1) -> _ => {} recv(r2) -> _ => {} } }); }) .unwrap(); } #[test] fn recv_race() { select! { recv(after(ms(50))) -> _ => {} recv(after(ms(100))) -> _ => panic!(), } select! { recv(after(ms(100))) -> _ => panic!(), recv(after(ms(50))) -> _ => {} } } #[test] fn stress_default() { const COUNT: usize = 10; for _ in 0..COUNT { select! { recv(after(ms(0))) -> _ => {} default => panic!(), } } for _ in 0..COUNT { select! { recv(after(ms(100))) -> _ => panic!(), default => {} } } } #[test] fn select() { const THREADS: usize = 4; const COUNT: usize = 1000; const TIMEOUT_MS: u64 = 100; let v = (0..COUNT) .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) .collect::>(); let hits = AtomicUsize::new(0); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { let v: Vec<&_> = v.iter().collect(); loop { let timeout = after(ms(TIMEOUT_MS)); let mut sel = Select::new(); for r in &v { sel.recv(r); } let oper_timeout = sel.recv(&timeout); let oper = sel.select(); match oper.index() { i if i == oper_timeout => { oper.recv(&timeout).unwrap(); break; } i => { oper.recv(&v[i]).unwrap(); hits.fetch_add(1, Ordering::SeqCst); } } } }); } }) .unwrap(); assert_eq!(hits.load(Ordering::SeqCst), COUNT); } #[test] fn ready() { const THREADS: usize = 4; const COUNT: usize = 1000; const TIMEOUT_MS: u64 = 100; let v = (0..COUNT) .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) .collect::>(); let hits = AtomicUsize::new(0); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { let v: Vec<&_> = v.iter().collect(); loop { let timeout = after(ms(TIMEOUT_MS)); let mut sel = Select::new(); for r in &v { sel.recv(r); } let oper_timeout = sel.recv(&timeout); loop { let i = sel.ready(); if i == oper_timeout { timeout.try_recv().unwrap(); return; } else if v[i].try_recv().is_ok() { hits.fetch_add(1, Ordering::SeqCst); break; } } } }); } }) .unwrap(); assert_eq!(hits.load(Ordering::SeqCst), COUNT); } #[test] fn stress_clone() { const RUNS: usize = 1000; const THREADS: usize = 10; const COUNT: usize = 50; for i in 0..RUNS { let r = after(ms(i as u64)); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { let r = r.clone(); let _ = r.try_recv(); for _ in 0..COUNT { drop(r.clone()); thread::yield_now(); } }); } }) .unwrap(); } } #[test] fn fairness() { const COUNT: usize = 1000; for &dur in &[0, 1] { let mut hits = [0usize; 2]; for _ in 0..COUNT { select! { recv(after(ms(dur))) -> _ => hits[0] += 1, recv(after(ms(dur))) -> _ => hits[1] += 1, } } assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } } #[test] fn fairness_duplicates() { const COUNT: usize = 1000; for &dur in &[0, 1] { let mut hits = [0usize; 5]; for _ in 0..COUNT { let r = after(ms(dur)); select! { recv(r) -> _ => hits[0] += 1, recv(r) -> _ => hits[1] += 1, recv(r) -> _ => hits[2] += 1, recv(r) -> _ => hits[3] += 1, recv(r) -> _ => hits[4] += 1, } } assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } }