//! Tests for the tick channel flavor. #![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::thread; use std::time::{Duration, Instant}; use crossbeam_channel::{after, select, tick, Select, TryRecvError}; use crossbeam_utils::thread::scope; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) } #[test] fn fire() { let start = Instant::now(); let r = tick(ms(50)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); thread::sleep(ms(100)); let fired = r.try_recv().unwrap(); assert!(start < fired); assert!(fired - start >= ms(50)); let now = Instant::now(); assert!(fired < now); assert!(now - fired >= ms(50)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); select! { recv(r) -> _ => panic!(), default => {} } select! { recv(r) -> _ => {} recv(tick(ms(200))) -> _ => panic!(), } } #[test] fn intervals() { let start = Instant::now(); let r = tick(ms(50)); let t1 = r.recv().unwrap(); assert!(start + ms(50) <= t1); assert!(start + ms(100) > t1); thread::sleep(ms(300)); let t2 = r.try_recv().unwrap(); assert!(start + ms(100) <= t2); assert!(start + ms(150) > t2); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); let t3 = r.recv().unwrap(); assert!(start + ms(400) <= t3); assert!(start + ms(450) > t3); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } #[test] fn capacity() { const COUNT: usize = 10; for i in 0..COUNT { let r = tick(ms(i as u64)); assert_eq!(r.capacity(), Some(1)); } } #[test] fn len_empty_full() { let r = tick(ms(50)); assert_eq!(r.len(), 0); assert!(r.is_empty()); assert!(!r.is_full()); thread::sleep(ms(100)); assert_eq!(r.len(), 1); assert!(!r.is_empty()); assert!(r.is_full()); r.try_recv().unwrap(); assert_eq!(r.len(), 0); assert!(r.is_empty()); assert!(!r.is_full()); } #[test] fn try_recv() { let r = tick(ms(200)); assert!(r.try_recv().is_err()); thread::sleep(ms(100)); assert!(r.try_recv().is_err()); thread::sleep(ms(200)); assert!(r.try_recv().is_ok()); assert!(r.try_recv().is_err()); thread::sleep(ms(200)); assert!(r.try_recv().is_ok()); assert!(r.try_recv().is_err()); } #[test] fn recv() { let start = Instant::now(); let r = tick(ms(50)); let fired = r.recv().unwrap(); assert!(start < fired); assert!(fired - start >= ms(50)); let now = Instant::now(); assert!(fired < now); assert!(now - fired < fired - start); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow #[test] fn recv_timeout() { let start = Instant::now(); let r = tick(ms(200)); assert!(r.recv_timeout(ms(100)).is_err()); let now = Instant::now(); assert!(now - start >= ms(100)); assert!(now - start <= ms(150)); let fired = r.recv_timeout(ms(200)).unwrap(); assert!(fired - start >= ms(200)); assert!(fired - start <= ms(250)); assert!(r.recv_timeout(ms(100)).is_err()); let now = Instant::now(); assert!(now - start >= ms(300)); assert!(now - start <= ms(350)); let fired = r.recv_timeout(ms(200)).unwrap(); assert!(fired - start >= ms(400)); assert!(fired - start <= ms(450)); } #[test] fn recv_two() { let r1 = tick(ms(50)); let r2 = tick(ms(50)); scope(|scope| { scope.spawn(|_| { for _ in 0..10 { select! { recv(r1) -> _ => {} recv(r2) -> _ => {} } } }); scope.spawn(|_| { for _ in 0..10 { select! { recv(r1) -> _ => {} recv(r2) -> _ => {} } } }); }) .unwrap(); } #[test] fn recv_race() { select! { recv(tick(ms(50))) -> _ => {} recv(tick(ms(100))) -> _ => panic!(), } select! { recv(tick(ms(100))) -> _ => panic!(), recv(tick(ms(50))) -> _ => {} } } #[test] fn stress_default() { const COUNT: usize = 10; for _ in 0..COUNT { select! { recv(tick(ms(0))) -> _ => {} default => panic!(), } } for _ in 0..COUNT { select! { recv(tick(ms(100))) -> _ => panic!(), default => {} } } } #[test] fn select() { const THREADS: usize = 4; let hits = AtomicUsize::new(0); let r1 = tick(ms(200)); let r2 = tick(ms(300)); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { let timeout = after(ms(1100)); loop { let mut sel = Select::new(); let oper1 = sel.recv(&r1); let oper2 = sel.recv(&r2); let oper3 = sel.recv(&timeout); let oper = sel.select(); match oper.index() { i if i == oper1 => { oper.recv(&r1).unwrap(); hits.fetch_add(1, Ordering::SeqCst); } i if i == oper2 => { oper.recv(&r2).unwrap(); hits.fetch_add(1, Ordering::SeqCst); } i if i == oper3 => { oper.recv(&timeout).unwrap(); break; } _ => unreachable!(), } } }); } }) .unwrap(); assert_eq!(hits.load(Ordering::SeqCst), 8); } #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow #[test] fn ready() { const THREADS: usize = 4; let hits = AtomicUsize::new(0); let r1 = tick(ms(200)); let r2 = tick(ms(300)); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { let timeout = after(ms(1100)); 'outer: loop { let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); sel.recv(&timeout); loop { match sel.ready() { 0 => { if r1.try_recv().is_ok() { hits.fetch_add(1, Ordering::SeqCst); break; } } 1 => { if r2.try_recv().is_ok() { hits.fetch_add(1, Ordering::SeqCst); break; } } 2 => { if timeout.try_recv().is_ok() { break 'outer; } } _ => unreachable!(), } } } }); } }) .unwrap(); assert_eq!(hits.load(Ordering::SeqCst), 8); } #[test] fn fairness() { const COUNT: usize = 30; for &dur in &[0, 1] { let mut hits = [0usize; 2]; for _ in 0..COUNT { let r1 = tick(ms(dur)); let r2 = tick(ms(dur)); for _ in 0..COUNT { select! { recv(r1) -> _ => hits[0] += 1, recv(r2) -> _ => hits[1] += 1, } } } assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } } #[test] fn fairness_duplicates() { const COUNT: usize = 30; for &dur in &[0, 1] { let mut hits = [0usize; 5]; for _ in 0..COUNT { let r = tick(ms(dur)); for _ in 0..COUNT { select! { recv(r) -> _ => hits[0] += 1, recv(r) -> _ => hits[1] += 1, recv(r) -> _ => hits[2] += 1, recv(r) -> _ => hits[3] += 1, recv(r) -> _ => hits[4] += 1, } } } assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); } }