diff options
Diffstat (limited to 'third_party/rust/crossbeam-channel/tests/select.rs')
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/select.rs | 1328 |
1 files changed, 1328 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/tests/select.rs b/third_party/rust/crossbeam-channel/tests/select.rs new file mode 100644 index 0000000000..bc5824daba --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/select.rs @@ -0,0 +1,1328 @@ +//! Tests for channel selection using the `Select` struct. + +#![allow(clippy::drop_copy)] + +use std::any::Any; +use std::cell::Cell; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke1() { + let (s1, r1) = unbounded::<usize>(); + let (s2, r2) = unbounded::<usize>(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => panic!(), + _ => unreachable!(), + } + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + } +} + +#[test] +fn smoke2() { + let (_s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (_s3, r3) = unbounded::<i32>(); + let (_s4, r4) = unbounded::<i32>(); + let (s5, r5) = unbounded::<i32>(); + + s5.send(5).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&r3); + let oper4 = sel.recv(&r4); + let oper5 = sel.recv(&r5); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => panic!(), + i if i == oper3 => panic!(), + i if i == oper4 => panic!(), + i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)), + _ => unreachable!(), + } +} + +#[test] +fn disconnected() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + drop(s1); + thread::sleep(ms(500)); + s2.send(5).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + r2.recv().unwrap(); + }) + .unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + drop(s2); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r2).is_err()), + _ => unreachable!(), + }, + } + }) + .unwrap(); +} + +#[test] +fn default() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + let mut sel = Select::new(); + let _oper1 = sel.recv(&r1); + let _oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } + + drop(s1); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + + let mut sel = Select::new(); + let _oper1 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } + + let mut sel = Select::new(); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } +} + +#[test] +fn timeout() { + let (_s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(1500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + }) + .unwrap(); + + scope(|scope| { + let (s, r) = unbounded::<i32>(); + + scope.spawn(move |_| { + thread::sleep(ms(500)); + drop(s); + }); + + let mut sel = Select::new(); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + } + Ok(_) => unreachable!(), + } + }) + .unwrap(); +} + +#[test] +fn default_when_disconnected() { + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.send(&s, 0).is_err()), + _ => unreachable!(), + }, + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.send(&s, 0).is_err()), + _ => unreachable!(), + }, + } +} + +#[test] +fn default_only() { + let start = Instant::now(); + + let mut sel = Select::new(); + let oper = sel.try_select(); + assert!(oper.is_err()); + let now = Instant::now(); + assert!(now - start <= ms(50)); + + let start = Instant::now(); + let mut sel = Select::new(); + let oper = sel.select_timeout(ms(500)); + assert!(oper.is_err()); + let now = Instant::now(); + assert!(now - start >= ms(450)); + assert!(now - start <= ms(550)); +} + +#[test] +fn unblocks() { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + }) + .unwrap(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + assert_eq!(r1.recv().unwrap(), 1); + }); + + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper2 = sel.send(&s2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => oper.send(&s1, 1).unwrap(), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + }) + .unwrap(); +} + +#[test] +fn both_ready() { + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s1.send(1).unwrap(); + assert_eq!(r2.recv().unwrap(), 2); + }); + + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => oper.send(&s2, 2).unwrap(), + _ => unreachable!(), + } + } + }) + .unwrap(); +} + +#[test] +fn loop_try() { + const RUNS: usize = 20; + + for _ in 0..RUNS { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + let (s_end, r_end) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| loop { + let mut done = false; + + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.send(&s1, 1); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + + let mut sel = Select::new(); + let oper1 = sel.recv(&r_end); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.recv(&r_end); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + }); + + scope.spawn(|_| loop { + if let Ok(x) = r2.try_recv() { + assert_eq!(x, 2); + break; + } + + let mut done = false; + let mut sel = Select::new(); + let oper1 = sel.recv(&r_end); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.recv(&r_end); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + }); + + scope.spawn(|_| { + thread::sleep(ms(500)); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()), + _ => unreachable!(), + }, + } + + drop(s_end); + }); + }) + .unwrap(); + } +} + +#[test] +fn cloning1() { + scope(|scope| { + let (s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (s3, r3) = unbounded::<()>(); + + scope.spawn(move |_| { + r3.recv().unwrap(); + drop(s1.clone()); + assert!(r3.try_recv().is_err()); + s1.send(1).unwrap(); + r3.recv().unwrap(); + }); + + s3.send(()).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => drop(oper.recv(&r1)), + i if i == oper2 => drop(oper.recv(&r2)), + _ => unreachable!(), + } + + s3.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn cloning2() { + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (_s3, _r3) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => drop(oper.recv(&r2)), + _ => unreachable!(), + } + }); + + thread::sleep(ms(500)); + drop(s1.clone()); + s2.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn preflight1() { + let (s, r) = unbounded(); + s.send(()).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => drop(oper.recv(&r)), + _ => unreachable!(), + } +} + +#[test] +fn preflight2() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())), + _ => unreachable!(), + } + + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn preflight3() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + r.recv().unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + } +} + +#[test] +fn duplicate_operations() { + let (s, r) = unbounded::<i32>(); + let hit = vec![Cell::new(false); 4]; + + while hit.iter().map(|h| h.get()).any(|hit| !hit) { + let mut sel = Select::new(); + let oper0 = sel.recv(&r); + let oper1 = sel.recv(&r); + let oper2 = sel.send(&s); + let oper3 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper0 => { + assert!(oper.recv(&r).is_ok()); + hit[0].set(true); + } + i if i == oper1 => { + assert!(oper.recv(&r).is_ok()); + hit[1].set(true); + } + i if i == oper2 => { + assert!(oper.send(&s, 0).is_ok()); + hit[2].set(true); + } + i if i == oper3 => { + assert!(oper.send(&s, 0).is_ok()); + hit[3].set(true); + } + _ => unreachable!(), + } + } +} + +#[test] +fn nesting() { + let (s, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert!(oper.send(&s, 0).is_ok()); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert_eq!(oper.recv(&r), Ok(0)); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert!(oper.send(&s, 1).is_ok()); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert_eq!(oper.recv(&r), Ok(1)); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } +} + +#[test] +fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded(); + let (s2, r2) = bounded(5); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + r3.recv().unwrap(); + + s2.send(i).unwrap(); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)), + _ => unreachable!(), + } + + s3.send(()).unwrap(); + } + } + }) + .unwrap(); +} + +#[test] +fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r1.recv().unwrap(), i); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 20; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => { + assert!(oper.send(&s, i).is_ok()); + break; + } + _ => unreachable!(), + }, + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => { + assert_eq!(oper.recv(&r), Ok(i)); + break; + } + _ => unreachable!(), + }, + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn send_recv_same_channel() { + let (s, r) = bounded::<i32>(0); + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper2 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => panic!(), + ix if ix == oper2 => panic!(), + _ => unreachable!(), + }, + } + + let (s, r) = unbounded::<i32>(); + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper2 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()), + ix if ix == oper2 => panic!(), + _ => unreachable!(), + }, + } +} + +#[test] +fn matching() { + const THREADS: usize = 44; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn matching_with_leftover() { + const THREADS: usize = 55; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + s.send(!0).unwrap(); + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + for cap in 0..3 { + let (s, r) = bounded::<T>(cap); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(cap); + let new_r: T = Box::new(Some(new_r)); + + { + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()), + _ => unreachable!(), + } + } + + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + let new = { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => oper + .recv(&r) + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap(), + _ => unreachable!(), + } + }; + r = new; + } + }); + }) + .unwrap(); + } +} + +#[test] +fn linearizable_try() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => unreachable!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), + ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), + _ => unreachable!(), + }, + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn linearizable_timeout() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(0)); + match oper { + Err(_) => unreachable!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), + ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), + _ => unreachable!(), + }, + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let hits = vec![Cell::new(0usize); 4]; + for _ in 0..COUNT { + let after = after(ms(0)); + let tick = tick(ms(0)); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&after); + let oper4 = sel.recv(&tick); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + oper.recv(&r1).unwrap(); + hits[0].set(hits[0].get() + 1); + } + i if i == oper2 => { + oper.recv(&r2).unwrap(); + hits[1].set(hits[1].get() + 1); + } + i if i == oper3 => { + oper.recv(&after).unwrap(); + hits[2].set(hits[2].get() + 1); + } + i if i == oper4 => { + oper.recv(&tick).unwrap(); + hits[3].set(hits[3].get() + 1); + } + _ => unreachable!(), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = bounded::<()>(1); + let (s3, r3) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + let mut sel = Select::new(); + let mut oper1 = None; + let mut oper2 = None; + if s1.is_empty() { + oper1 = Some(sel.send(&s1)); + } + if s2.is_empty() { + oper2 = Some(sel.send(&s2)); + } + let oper3 = sel.send(&s3); + let oper = sel.select(); + match oper.index() { + i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), + i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), + i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), + _ => unreachable!(), + } + } + }); + + let hits = vec![Cell::new(0usize); 3]; + for _ in 0..COUNT { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&r3); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + oper.recv(&r1).unwrap(); + hits[0].set(hits[0].get() + 1); + } + i if i == oper2 => { + oper.recv(&r2).unwrap(); + hits[1].set(hits[1].get() + 1); + } + i if i == oper3 => { + oper.recv(&r3).unwrap(); + hits[2].set(hits[2].get() + 1); + } + _ => unreachable!(), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); + }) + .unwrap(); +} + +#[test] +fn sync_and_clone() { + const THREADS: usize = 20; + + let (s, r) = &bounded::<usize>(0); + + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let sel = &sel; + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = sel.clone(); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn send_and_clone() { + const THREADS: usize = 20; + + let (s, r) = &bounded::<usize>(0); + + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + + scope(|scope| { + for i in 0..THREADS { + let mut sel = sel.clone(); + scope.spawn(move |_| { + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn reuse() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + + for i in 0..COUNT { + for _ in 0..2 { + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} |