//! Tests for channel readiness using the `Select` struct. #![allow(clippy::drop_copy)] use std::any::Any; use std::cell::Cell; use std::thread; use std::time::{Duration, Instant}; use crossbeam_channel::{after, bounded, tick, unbounded}; use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError}; use crossbeam_utils::thread::scope; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) } #[test] fn smoke1() { let (s1, r1) = unbounded::<usize>(); let (s2, r2) = unbounded::<usize>(); s1.send(1).unwrap(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); assert_eq!(sel.ready(), 0); assert_eq!(r1.try_recv(), Ok(1)); s2.send(2).unwrap(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); assert_eq!(sel.ready(), 1); assert_eq!(r2.try_recv(), Ok(2)); } #[test] fn smoke2() { let (_s1, r1) = unbounded::<i32>(); let (_s2, r2) = unbounded::<i32>(); let (_s3, r3) = unbounded::<i32>(); let (_s4, r4) = unbounded::<i32>(); let (s5, r5) = unbounded::<i32>(); s5.send(5).unwrap(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); sel.recv(&r3); sel.recv(&r4); sel.recv(&r5); assert_eq!(sel.ready(), 4); assert_eq!(r5.try_recv(), Ok(5)); } #[test] fn disconnected() { let (s1, r1) = unbounded::<i32>(); let (s2, r2) = unbounded::<i32>(); scope(|scope| { scope.spawn(|_| { drop(s1); thread::sleep(ms(500)); s2.send(5).unwrap(); }); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready_timeout(ms(1000)) { Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } r2.recv().unwrap(); }) .unwrap(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready_timeout(ms(1000)) { Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } scope(|scope| { scope.spawn(|_| { thread::sleep(ms(500)); drop(s2); }); let mut sel = Select::new(); sel.recv(&r2); match sel.ready_timeout(ms(1000)) { Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } }) .unwrap(); } #[test] fn default() { let (s1, r1) = unbounded::<i32>(); let (s2, r2) = unbounded::<i32>(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); assert!(sel.try_ready().is_err()); drop(s1); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.try_ready() { Ok(0) => assert!(r1.try_recv().is_err()), _ => panic!(), } s2.send(2).unwrap(); let mut sel = Select::new(); sel.recv(&r2); match sel.try_ready() { Ok(0) => assert_eq!(r2.try_recv(), Ok(2)), _ => panic!(), } let mut sel = Select::new(); sel.recv(&r2); assert!(sel.try_ready().is_err()); let mut sel = Select::new(); assert!(sel.try_ready().is_err()); } #[test] fn timeout() { let (_s1, r1) = unbounded::<i32>(); let (s2, r2) = unbounded::<i32>(); scope(|scope| { scope.spawn(|_| { thread::sleep(ms(1500)); s2.send(2).unwrap(); }); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); assert!(sel.ready_timeout(ms(1000)).is_err()); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready_timeout(ms(1000)) { Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), _ => panic!(), } }) .unwrap(); scope(|scope| { let (s, r) = unbounded::<i32>(); scope.spawn(move |_| { thread::sleep(ms(500)); drop(s); }); let mut sel = Select::new(); assert!(sel.ready_timeout(ms(1000)).is_err()); let mut sel = Select::new(); sel.recv(&r); match sel.try_ready() { Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } }) .unwrap(); } #[test] fn default_when_disconnected() { let (_, r) = unbounded::<i32>(); let mut sel = Select::new(); sel.recv(&r); match sel.try_ready() { Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } let (_, r) = unbounded::<i32>(); let mut sel = Select::new(); sel.recv(&r); match sel.ready_timeout(ms(1000)) { Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } let (s, _) = bounded::<i32>(0); let mut sel = Select::new(); sel.send(&s); match sel.try_ready() { Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), _ => panic!(), } let (s, _) = bounded::<i32>(0); let mut sel = Select::new(); sel.send(&s); match sel.ready_timeout(ms(1000)) { Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), _ => panic!(), } } #[test] fn default_only() { let start = Instant::now(); let mut sel = Select::new(); assert!(sel.try_ready().is_err()); let now = Instant::now(); assert!(now - start <= ms(50)); let start = Instant::now(); let mut sel = Select::new(); assert!(sel.ready_timeout(ms(500)).is_err()); let now = Instant::now(); assert!(now - start >= ms(450)); assert!(now - start <= ms(550)); } #[test] fn unblocks() { let (s1, r1) = bounded::<i32>(0); let (s2, r2) = bounded::<i32>(0); scope(|scope| { scope.spawn(|_| { thread::sleep(ms(500)); s2.send(2).unwrap(); }); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready_timeout(ms(1000)) { Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), _ => panic!(), } }) .unwrap(); scope(|scope| { scope.spawn(|_| { thread::sleep(ms(500)); assert_eq!(r1.recv().unwrap(), 1); }); let mut sel = Select::new(); let oper1 = sel.send(&s1); let oper2 = sel.send(&s2); let oper = sel.select_timeout(ms(1000)); match oper { Err(_) => panic!(), Ok(oper) => match oper.index() { i if i == oper1 => oper.send(&s1, 1).unwrap(), i if i == oper2 => panic!(), _ => unreachable!(), }, } }) .unwrap(); } #[test] fn both_ready() { let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); scope(|scope| { scope.spawn(|_| { thread::sleep(ms(500)); s1.send(1).unwrap(); assert_eq!(r2.recv().unwrap(), 2); }); for _ in 0..2 { let mut sel = Select::new(); sel.recv(&r1); sel.send(&s2); match sel.ready() { 0 => assert_eq!(r1.try_recv(), Ok(1)), 1 => s2.try_send(2).unwrap(), _ => panic!(), } } }) .unwrap(); } #[test] fn cloning1() { scope(|scope| { let (s1, r1) = unbounded::<i32>(); let (_s2, r2) = unbounded::<i32>(); let (s3, r3) = unbounded::<()>(); scope.spawn(move |_| { r3.recv().unwrap(); drop(s1.clone()); assert!(r3.try_recv().is_err()); s1.send(1).unwrap(); r3.recv().unwrap(); }); s3.send(()).unwrap(); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready() { 0 => drop(r1.try_recv()), 1 => drop(r2.try_recv()), _ => panic!(), } s3.send(()).unwrap(); }) .unwrap(); } #[test] fn cloning2() { let (s1, r1) = unbounded::<()>(); let (s2, r2) = unbounded::<()>(); let (_s3, _r3) = unbounded::<()>(); scope(|scope| { scope.spawn(move |_| { let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready() { 0 => panic!(), 1 => drop(r2.try_recv()), _ => panic!(), } }); thread::sleep(ms(500)); drop(s1.clone()); s2.send(()).unwrap(); }) .unwrap(); } #[test] fn preflight1() { let (s, r) = unbounded(); s.send(()).unwrap(); let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => drop(r.try_recv()), _ => panic!(), } } #[test] fn preflight2() { let (s, r) = unbounded(); drop(s.clone()); s.send(()).unwrap(); drop(s); let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => assert_eq!(r.try_recv(), Ok(())), _ => panic!(), } assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); } #[test] fn preflight3() { let (s, r) = unbounded(); drop(s.clone()); s.send(()).unwrap(); drop(s); r.recv().unwrap(); let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), _ => panic!(), } } #[test] fn duplicate_operations() { let (s, r) = unbounded::<i32>(); let hit = vec![Cell::new(false); 4]; while hit.iter().map(|h| h.get()).any(|hit| !hit) { let mut sel = Select::new(); sel.recv(&r); sel.recv(&r); sel.send(&s); sel.send(&s); match sel.ready() { 0 => { assert!(r.try_recv().is_ok()); hit[0].set(true); } 1 => { assert!(r.try_recv().is_ok()); hit[1].set(true); } 2 => { assert!(s.try_send(0).is_ok()); hit[2].set(true); } 3 => { assert!(s.try_send(0).is_ok()); hit[3].set(true); } _ => panic!(), } } } #[test] fn nesting() { let (s, r) = unbounded::<i32>(); let mut sel = Select::new(); sel.send(&s); match sel.ready() { 0 => { assert!(s.try_send(0).is_ok()); let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => { assert_eq!(r.try_recv(), Ok(0)); let mut sel = Select::new(); sel.send(&s); match sel.ready() { 0 => { assert!(s.try_send(1).is_ok()); let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => { assert_eq!(r.try_recv(), Ok(1)); } _ => panic!(), } } _ => panic!(), } } _ => panic!(), } } _ => panic!(), } } #[test] fn stress_recv() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = unbounded(); let (s2, r2) = bounded(5); let (s3, r3) = bounded(0); scope(|scope| { scope.spawn(|_| { for i in 0..COUNT { s1.send(i).unwrap(); r3.recv().unwrap(); s2.send(i).unwrap(); r3.recv().unwrap(); } }); for i in 0..COUNT { for _ in 0..2 { let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); match sel.ready() { 0 => assert_eq!(r1.try_recv(), Ok(i)), 1 => assert_eq!(r2.try_recv(), Ok(i)), _ => panic!(), } s3.send(()).unwrap(); } } }) .unwrap(); } #[test] fn stress_send() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); let (s3, r3) = bounded(100); scope(|scope| { scope.spawn(|_| { for i in 0..COUNT { assert_eq!(r1.recv().unwrap(), i); assert_eq!(r2.recv().unwrap(), i); r3.recv().unwrap(); } }); for i in 0..COUNT { for _ in 0..2 { let mut sel = Select::new(); sel.send(&s1); sel.send(&s2); match sel.ready() { 0 => assert!(s1.try_send(i).is_ok()), 1 => assert!(s2.try_send(i).is_ok()), _ => panic!(), } } s3.send(()).unwrap(); } }) .unwrap(); } #[test] fn stress_mixed() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded(0); let (s2, r2) = bounded(0); let (s3, r3) = bounded(100); scope(|scope| { scope.spawn(|_| { for i in 0..COUNT { s1.send(i).unwrap(); assert_eq!(r2.recv().unwrap(), i); r3.recv().unwrap(); } }); for i in 0..COUNT { for _ in 0..2 { let mut sel = Select::new(); sel.recv(&r1); sel.send(&s2); match sel.ready() { 0 => assert_eq!(r1.try_recv(), Ok(i)), 1 => assert!(s2.try_send(i).is_ok()), _ => panic!(), } } s3.send(()).unwrap(); } }) .unwrap(); } #[test] fn stress_timeout_two_threads() { const COUNT: usize = 20; let (s, r) = bounded(2); scope(|scope| { scope.spawn(|_| { for i in 0..COUNT { if i % 2 == 0 { thread::sleep(ms(500)); } loop { let mut sel = Select::new(); sel.send(&s); match sel.ready_timeout(ms(100)) { Err(_) => {} Ok(0) => { assert!(s.try_send(i).is_ok()); break; } Ok(_) => panic!(), } } } }); scope.spawn(|_| { for i in 0..COUNT { if i % 2 == 0 { thread::sleep(ms(500)); } loop { let mut sel = Select::new(); sel.recv(&r); match sel.ready_timeout(ms(100)) { Err(_) => {} Ok(0) => { assert_eq!(r.try_recv(), Ok(i)); break; } Ok(_) => panic!(), } } } }); }) .unwrap(); } #[test] fn send_recv_same_channel() { let (s, r) = bounded::<i32>(0); let mut sel = Select::new(); sel.send(&s); sel.recv(&r); assert!(sel.ready_timeout(ms(100)).is_err()); let (s, r) = unbounded::<i32>(); let mut sel = Select::new(); sel.send(&s); sel.recv(&r); match sel.ready_timeout(ms(100)) { Err(_) => panic!(), Ok(0) => assert!(s.try_send(0).is_ok()), Ok(_) => panic!(), } } #[test] fn channel_through_channel() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 1000; type T = Box<dyn Any + Send>; for cap in 1..4 { let (s, r) = bounded::<T>(cap); scope(|scope| { scope.spawn(move |_| { let mut s = s; for _ in 0..COUNT { let (new_s, new_r) = bounded(cap); let new_r: T = Box::new(Some(new_r)); { let mut sel = Select::new(); sel.send(&s); match sel.ready() { 0 => assert!(s.try_send(new_r).is_ok()), _ => panic!(), } } s = new_s; } }); scope.spawn(move |_| { let mut r = r; for _ in 0..COUNT { let new = { let mut sel = Select::new(); sel.recv(&r); match sel.ready() { 0 => r .try_recv() .unwrap() .downcast_mut::<Option<Receiver<T>>>() .unwrap() .take() .unwrap(), _ => panic!(), } }; r = new; } }); }) .unwrap(); } } #[test] fn fairness1() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 10_000; let (s1, r1) = bounded::<()>(COUNT); let (s2, r2) = unbounded::<()>(); for _ in 0..COUNT { s1.send(()).unwrap(); s2.send(()).unwrap(); } let hits = vec![Cell::new(0usize); 4]; for _ in 0..COUNT { let after = after(ms(0)); let tick = tick(ms(0)); let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); sel.recv(&after); sel.recv(&tick); match sel.ready() { 0 => { r1.try_recv().unwrap(); hits[0].set(hits[0].get() + 1); } 1 => { r2.try_recv().unwrap(); hits[1].set(hits[1].get() + 1); } 2 => { after.try_recv().unwrap(); hits[2].set(hits[2].get() + 1); } 3 => { tick.try_recv().unwrap(); hits[3].set(hits[3].get() + 1); } _ => panic!(), } } assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); } #[test] fn fairness2() { #[cfg(miri)] const COUNT: usize = 100; #[cfg(not(miri))] const COUNT: usize = 100_000; let (s1, r1) = unbounded::<()>(); let (s2, r2) = bounded::<()>(1); let (s3, r3) = bounded::<()>(0); scope(|scope| { scope.spawn(|_| { for _ in 0..COUNT { let mut sel = Select::new(); let mut oper1 = None; let mut oper2 = None; if s1.is_empty() { oper1 = Some(sel.send(&s1)); } if s2.is_empty() { oper2 = Some(sel.send(&s2)); } let oper3 = sel.send(&s3); let oper = sel.select(); match oper.index() { i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), _ => unreachable!(), } } }); let hits = vec![Cell::new(0usize); 3]; for _ in 0..COUNT { let mut sel = Select::new(); sel.recv(&r1); sel.recv(&r2); sel.recv(&r3); loop { match sel.ready() { 0 => { if r1.try_recv().is_ok() { hits[0].set(hits[0].get() + 1); break; } } 1 => { if r2.try_recv().is_ok() { hits[1].set(hits[1].get() + 1); break; } } 2 => { if r3.try_recv().is_ok() { hits[2].set(hits[2].get() + 1); break; } } _ => unreachable!(), } } } assert!(hits.iter().all(|x| x.get() > 0)); }) .unwrap(); }