diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 19:33:14 +0000 |
commit | 36d22d82aa202bb199967e9512281e9a53db42c9 (patch) | |
tree | 105e8c98ddea1c1e4784a60a5a6410fa416be2de /third_party/rust/crossbeam-channel/tests | |
parent | Initial commit. (diff) | |
download | firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.tar.xz firefox-esr-36d22d82aa202bb199967e9512281e9a53db42c9.zip |
Adding upstream version 115.7.0esr.upstream/115.7.0esr
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'third_party/rust/crossbeam-channel/tests')
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/after.rs | 336 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/array.rs | 744 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/golang.rs | 2141 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/iter.rs | 110 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/list.rs | 582 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/mpsc.rs | 2129 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/never.rs | 95 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/ready.rs | 852 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/same_channel.rs | 112 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/select.rs | 1328 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/select_macro.rs | 1480 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/thread_locals.rs | 53 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/tick.rs | 352 | ||||
-rw-r--r-- | third_party/rust/crossbeam-channel/tests/zero.rs | 587 |
14 files changed, 10901 insertions, 0 deletions
diff --git a/third_party/rust/crossbeam-channel/tests/after.rs b/third_party/rust/crossbeam-channel/tests/after.rs new file mode 100644 index 0000000000..678a8c679c --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/after.rs @@ -0,0 +1,336 @@ +//! Tests for the after channel flavor. + +#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, select, Select, TryRecvError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn fire() { + let start = Instant::now(); + let r = after(ms(50)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(100)); + + let fired = r.try_recv().unwrap(); + assert!(start < fired); + assert!(fired - start >= ms(50)); + + let now = Instant::now(); + assert!(fired < now); + assert!(now - fired >= ms(50)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + select! { + recv(r) -> _ => panic!(), + default => {} + } + + select! { + recv(r) -> _ => panic!(), + recv(after(ms(200))) -> _ => {} + } +} + +#[test] +fn capacity() { + const COUNT: usize = 10; + + for i in 0..COUNT { + let r = after(ms(i as u64)); + assert_eq!(r.capacity(), Some(1)); + } +} + +#[test] +fn len_empty_full() { + let r = after(ms(50)); + + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); + + thread::sleep(ms(100)); + + assert_eq!(r.len(), 1); + assert!(!r.is_empty()); + assert!(r.is_full()); + + r.try_recv().unwrap(); + + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); +} + +#[test] +fn try_recv() { + let r = after(ms(200)); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(100)); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(200)); + assert!(r.try_recv().is_ok()); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(200)); + assert!(r.try_recv().is_err()); +} + +#[test] +fn recv() { + let start = Instant::now(); + let r = after(ms(50)); + + let fired = r.recv().unwrap(); + assert!(start < fired); + assert!(fired - start >= ms(50)); + + let now = Instant::now(); + assert!(fired < now); + assert!(now - fired < fired - start); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn recv_timeout() { + let start = Instant::now(); + let r = after(ms(200)); + + assert!(r.recv_timeout(ms(100)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(100)); + assert!(now - start <= ms(150)); + + let fired = r.recv_timeout(ms(200)).unwrap(); + assert!(fired - start >= ms(200)); + assert!(fired - start <= ms(250)); + + assert!(r.recv_timeout(ms(200)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(400)); + assert!(now - start <= ms(450)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn recv_two() { + let r1 = after(ms(50)); + let r2 = after(ms(50)); + + scope(|scope| { + scope.spawn(|_| { + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + } + }); + scope.spawn(|_| { + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + } + }); + }) + .unwrap(); +} + +#[test] +fn recv_race() { + select! { + recv(after(ms(50))) -> _ => {} + recv(after(ms(100))) -> _ => panic!(), + } + + select! { + recv(after(ms(100))) -> _ => panic!(), + recv(after(ms(50))) -> _ => {} + } +} + +#[test] +fn stress_default() { + const COUNT: usize = 10; + + for _ in 0..COUNT { + select! { + recv(after(ms(0))) -> _ => {} + default => panic!(), + } + } + + for _ in 0..COUNT { + select! { + recv(after(ms(100))) -> _ => panic!(), + default => {} + } + } +} + +#[test] +fn select() { + const THREADS: usize = 4; + const COUNT: usize = 1000; + const TIMEOUT_MS: u64 = 100; + + let v = (0..COUNT) + .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) + .collect::<Vec<_>>(); + let hits = AtomicUsize::new(0); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + let v: Vec<&_> = v.iter().collect(); + + loop { + let timeout = after(ms(TIMEOUT_MS)); + let mut sel = Select::new(); + for r in &v { + sel.recv(r); + } + let oper_timeout = sel.recv(&timeout); + + let oper = sel.select(); + match oper.index() { + i if i == oper_timeout => { + oper.recv(&timeout).unwrap(); + break; + } + i => { + oper.recv(v[i]).unwrap(); + hits.fetch_add(1, Ordering::SeqCst); + } + } + } + }); + } + }) + .unwrap(); + + assert_eq!(hits.load(Ordering::SeqCst), COUNT); +} + +#[test] +fn ready() { + const THREADS: usize = 4; + const COUNT: usize = 1000; + const TIMEOUT_MS: u64 = 100; + + let v = (0..COUNT) + .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2))) + .collect::<Vec<_>>(); + let hits = AtomicUsize::new(0); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + let v: Vec<&_> = v.iter().collect(); + + loop { + let timeout = after(ms(TIMEOUT_MS)); + let mut sel = Select::new(); + for r in &v { + sel.recv(r); + } + let oper_timeout = sel.recv(&timeout); + + loop { + let i = sel.ready(); + if i == oper_timeout { + timeout.try_recv().unwrap(); + return; + } else if v[i].try_recv().is_ok() { + hits.fetch_add(1, Ordering::SeqCst); + break; + } + } + } + }); + } + }) + .unwrap(); + + assert_eq!(hits.load(Ordering::SeqCst), COUNT); +} + +#[test] +fn stress_clone() { + const RUNS: usize = 1000; + const THREADS: usize = 10; + const COUNT: usize = 50; + + for i in 0..RUNS { + let r = after(ms(i as u64)); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + let r = r.clone(); + let _ = r.try_recv(); + + for _ in 0..COUNT { + drop(r.clone()); + thread::yield_now(); + } + }); + } + }) + .unwrap(); + } +} + +#[test] +fn fairness() { + const COUNT: usize = 1000; + + for &dur in &[0, 1] { + let mut hits = [0usize; 2]; + + for _ in 0..COUNT { + select! { + recv(after(ms(dur))) -> _ => hits[0] += 1, + recv(after(ms(dur))) -> _ => hits[1] += 1, + } + } + + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + } +} + +#[test] +fn fairness_duplicates() { + const COUNT: usize = 1000; + + for &dur in &[0, 1] { + let mut hits = [0usize; 5]; + + for _ in 0..COUNT { + let r = after(ms(dur)); + select! { + recv(r) -> _ => hits[0] += 1, + recv(r) -> _ => hits[1] += 1, + recv(r) -> _ => hits[2] += 1, + recv(r) -> _ => hits[3] += 1, + recv(r) -> _ => hits[4] += 1, + } + } + + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + } +} diff --git a/third_party/rust/crossbeam-channel/tests/array.rs b/third_party/rust/crossbeam-channel/tests/array.rs new file mode 100644 index 0000000000..6fd8ffcc67 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/array.rs @@ -0,0 +1,744 @@ +//! Tests for the array channel flavor. + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{bounded, select, Receiver}; +use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError}; +use crossbeam_channel::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = bounded(1); + s.send(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send(8).unwrap(); + assert_eq!(r.recv(), Ok(8)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); +} + +#[test] +fn capacity() { + for i in 1..10 { + let (s, r) = bounded::<()>(i); + assert_eq!(s.capacity(), Some(i)); + assert_eq!(r.capacity(), Some(i)); + } +} + +#[test] +fn len_empty_full() { + let (s, r) = bounded(2); + + assert_eq!(s.len(), 0); + assert!(s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 1); + assert!(!s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 1); + assert!(!r.is_empty()); + assert!(!r.is_full()); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 2); + assert!(!s.is_empty()); + assert!(s.is_full()); + assert_eq!(r.len(), 2); + assert!(!r.is_empty()); + assert!(r.is_full()); + + r.recv().unwrap(); + + assert_eq!(s.len(), 1); + assert!(!s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 1); + assert!(!r.is_empty()); + assert!(!r.is_full()); +} + +#[test] +fn try_recv() { + let (s, r) = bounded(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = bounded(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = bounded::<i32>(100); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.try_send(1), Ok(())); + assert_eq!(s.try_send(2), Err(TrySendError::Full(2))); + thread::sleep(ms(1500)); + assert_eq!(s.try_send(3), Ok(())); + thread::sleep(ms(500)); + assert_eq!(s.try_send(4), Err(TrySendError::Disconnected(4))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.try_recv(), Ok(1)); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv(), Ok(3)); + }); + }) + .unwrap(); +} + +#[test] +fn send() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| { + s.send(7).unwrap(); + thread::sleep(ms(1000)); + s.send(8).unwrap(); + thread::sleep(ms(1000)); + s.send(9).unwrap(); + thread::sleep(ms(1000)); + s.send(10).unwrap(); + }); + scope.spawn(|_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(7)); + assert_eq!(r.recv(), Ok(8)); + assert_eq!(r.recv(), Ok(9)); + }); + }) + .unwrap(); +} + +#[test] +fn send_timeout() { + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send_timeout(1, ms(1000)), Ok(())); + assert_eq!(s.send_timeout(2, ms(1000)), Ok(())); + assert_eq!( + s.send_timeout(3, ms(500)), + Err(SendTimeoutError::Timeout(3)) + ); + thread::sleep(ms(1000)); + assert_eq!(s.send_timeout(4, ms(1000)), Ok(())); + thread::sleep(ms(1000)); + assert_eq!(s.send(5), Err(SendError(5))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(1)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(4)); + }); + }) + .unwrap(); +} + +#[test] +fn send_after_disconnect() { + let (s, r) = bounded(100); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(r); + + assert_eq!(s.send(4), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); + assert_eq!( + s.send_timeout(6, ms(500)), + Err(SendTimeoutError::Disconnected(6)) + ); +} + +#[test] +fn recv_after_disconnect() { + let (s, r) = bounded(100); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(3)); + assert_eq!(r.recv(), Err(RecvError)); +} + +#[test] +fn len() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + #[cfg(miri)] + const CAP: usize = 50; + #[cfg(not(miri))] + const CAP: usize = 1000; + + let (s, r) = bounded(CAP); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for _ in 0..CAP / 10 { + for i in 0..50 { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + r.recv().unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..CAP { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for _ in 0..CAP { + r.recv().unwrap(); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + let len = r.len(); + assert!(len <= CAP); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + let len = s.len(); + assert!(len <= CAP); + } + }); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_sender() { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send(()), Ok(())); + assert_eq!(s.send(()), Err(SendError(()))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(r); + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = bounded::<()>(1); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (s, r) = bounded(3); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(3); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (request_s, request_r) = bounded(1); + let (response_s, response_r) = bounded(1); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(()) = s.send_timeout(i, ms(10)) { + break; + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + #[cfg(miri)] + const RUNS: usize = 10; + #[cfg(not(miri))] + const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; + + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..RUNS { + let steps = rng.gen_range(0..STEPS); + let additional = rng.gen_range(0..50); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = bounded::<DropCounter>(50); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + for _ in 0..additional { + s.send(DropCounter).unwrap(); + } + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); + } +} + +#[test] +fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded(THREADS); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + s.send(0).unwrap(); + r.try_recv().unwrap(); + } + }); + } + }) + .unwrap(); +} + +#[test] +fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = bounded::<()>(COUNT); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s, r) = bounded::<()>(COUNT); + + for _ in 0..COUNT { + s.send(()).unwrap(); + } + + let mut hits = [0usize; 5]; + for _ in 0..COUNT { + select! { + recv(r) -> _ => hits[0] += 1, + recv(r) -> _ => hits[1] += 1, + recv(r) -> _ => hits[2] += 1, + recv(r) -> _ => hits[3] += 1, + recv(r) -> _ => hits[4] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +} + +#[test] +fn recv_in_send() { + let (s, _r) = bounded(1); + s.send(()).unwrap(); + + #[allow(unreachable_code)] + { + select! { + send(s, panic!()) -> _ => panic!(), + default => {} + } + } + + let (s, r) = bounded(2); + s.send(()).unwrap(); + + select! { + send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} + } +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = bounded::<T>(1); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(1); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} + +#[test] +fn panic_on_drop() { + struct Msg1<'a>(&'a mut bool); + impl Drop for Msg1<'_> { + fn drop(&mut self) { + if *self.0 && !std::thread::panicking() { + panic!("double drop"); + } else { + *self.0 = true; + } + } + } + + struct Msg2<'a>(&'a mut bool); + impl Drop for Msg2<'_> { + fn drop(&mut self) { + if *self.0 { + panic!("double drop"); + } else { + *self.0 = true; + panic!("first drop"); + } + } + } + + // normal + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg1(&mut a)).unwrap(); + s.send(Msg1(&mut b)).unwrap(); + drop(s); + drop(r); + assert!(a); + assert!(b); + + // panic on drop + let (s, r) = bounded(2); + let (mut a, mut b) = (false, false); + s.send(Msg2(&mut a)).unwrap(); + s.send(Msg2(&mut b)).unwrap(); + drop(s); + let res = std::panic::catch_unwind(move || { + drop(r); + }); + assert_eq!( + *res.unwrap_err().downcast_ref::<&str>().unwrap(), + "first drop" + ); + assert!(a); + // Elements after the panicked element will leak. + assert!(!b); +} diff --git a/third_party/rust/crossbeam-channel/tests/golang.rs b/third_party/rust/crossbeam-channel/tests/golang.rs new file mode 100644 index 0000000000..8050716c67 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/golang.rs @@ -0,0 +1,2141 @@ +//! Tests copied from Go and manually rewritten in Rust. +//! +//! Source: +//! - https://github.com/golang/go +//! +//! Copyright & License: +//! - Copyright (c) 2009 The Go Authors +//! - https://golang.org/AUTHORS +//! - https://golang.org/LICENSE +//! - https://golang.org/PATENTS + +#![allow(clippy::mutex_atomic, clippy::redundant_clone)] + +use std::alloc::{GlobalAlloc, Layout, System}; +use std::any::Any; +use std::cell::Cell; +use std::collections::HashMap; +use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering::SeqCst}; +use std::sync::{Arc, Condvar, Mutex}; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{bounded, never, select, tick, unbounded, Receiver, Select, Sender}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +struct Chan<T> { + inner: Arc<Mutex<ChanInner<T>>>, +} + +struct ChanInner<T> { + s: Option<Sender<T>>, + r: Option<Receiver<T>>, + // Receiver to use when r is None (Go blocks on receiving from nil) + nil_r: Receiver<T>, + // Sender to use when s is None (Go blocks on sending to nil) + nil_s: Sender<T>, + // Hold this receiver to prevent nil sender channel from disconnection + _nil_sr: Receiver<T>, +} + +impl<T> Clone for Chan<T> { + fn clone(&self) -> Chan<T> { + Chan { + inner: self.inner.clone(), + } + } +} + +impl<T> Chan<T> { + fn send(&self, msg: T) { + let s = self + .inner + .lock() + .unwrap() + .s + .as_ref() + .expect("sending into closed channel") + .clone(); + let _ = s.send(msg); + } + + fn try_recv(&self) -> Option<T> { + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); + r.try_recv().ok() + } + + fn recv(&self) -> Option<T> { + let r = self.inner.lock().unwrap().r.as_ref().unwrap().clone(); + r.recv().ok() + } + + fn close_s(&self) { + self.inner + .lock() + .unwrap() + .s + .take() + .expect("channel sender already closed"); + } + + fn close_r(&self) { + self.inner + .lock() + .unwrap() + .r + .take() + .expect("channel receiver already closed"); + } + + fn has_rx(&self) -> bool { + self.inner.lock().unwrap().r.is_some() + } + + fn has_tx(&self) -> bool { + self.inner.lock().unwrap().s.is_some() + } + + fn rx(&self) -> Receiver<T> { + let inner = self.inner.lock().unwrap(); + match inner.r.as_ref() { + None => inner.nil_r.clone(), + Some(r) => r.clone(), + } + } + + fn tx(&self) -> Sender<T> { + let inner = self.inner.lock().unwrap(); + match inner.s.as_ref() { + None => inner.nil_s.clone(), + Some(s) => s.clone(), + } + } +} + +impl<T> Iterator for Chan<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.recv() + } +} + +impl<'a, T> IntoIterator for &'a Chan<T> { + type Item = T; + type IntoIter = Chan<T>; + + fn into_iter(self) -> Self::IntoIter { + self.clone() + } +} + +fn make<T>(cap: usize) -> Chan<T> { + let (s, r) = bounded(cap); + let (nil_s, _nil_sr) = bounded(0); + Chan { + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), + } +} + +fn make_unbounded<T>() -> Chan<T> { + let (s, r) = unbounded(); + let (nil_s, _nil_sr) = bounded(0); + Chan { + inner: Arc::new(Mutex::new(ChanInner { + s: Some(s), + r: Some(r), + nil_r: never(), + nil_s, + _nil_sr, + })), + } +} + +#[derive(Clone)] +struct WaitGroup(Arc<WaitGroupInner>); + +struct WaitGroupInner { + cond: Condvar, + count: Mutex<i32>, +} + +impl WaitGroup { + fn new() -> WaitGroup { + WaitGroup(Arc::new(WaitGroupInner { + cond: Condvar::new(), + count: Mutex::new(0), + })) + } + + fn add(&self, delta: i32) { + let mut count = self.0.count.lock().unwrap(); + *count += delta; + assert!(*count >= 0); + self.0.cond.notify_all(); + } + + fn done(&self) { + self.add(-1); + } + + fn wait(&self) { + let mut count = self.0.count.lock().unwrap(); + while *count > 0 { + count = self.0.cond.wait(count).unwrap(); + } + } +} + +struct Defer<F: FnOnce()> { + f: Option<Box<F>>, +} + +impl<F: FnOnce()> Drop for Defer<F> { + fn drop(&mut self) { + let f = self.f.take().unwrap(); + let mut f = Some(f); + let mut f = move || f.take().unwrap()(); + f(); + } +} + +struct Counter; + +static ALLOCATED: AtomicUsize = AtomicUsize::new(0); +unsafe impl GlobalAlloc for Counter { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let ret = System.alloc(layout); + if !ret.is_null() { + ALLOCATED.fetch_add(layout.size(), SeqCst); + } + ret + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + System.dealloc(ptr, layout); + ALLOCATED.fetch_sub(layout.size(), SeqCst); + } +} + +#[global_allocator] +static A: Counter = Counter; + +macro_rules! defer { + ($body:expr) => { + let _defer = Defer { + f: Some(Box::new(|| $body)), + }; + }; +} + +macro_rules! go { + (@parse $v:ident, $($tail:tt)*) => {{ + let $v = $v.clone(); + go!(@parse $($tail)*) + }}; + (@parse $body:expr) => { + ::std::thread::spawn(move || { + let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| { + $body + })); + if res.is_err() { + eprintln!("goroutine panicked: {:?}", res); + ::std::process::abort(); + } + }) + }; + (@parse $($tail:tt)*) => { + compile_error!("invalid `go!` syntax") + }; + ($($tail:tt)*) => {{ + go!(@parse $($tail)*) + }}; +} + +// https://github.com/golang/go/blob/master/test/chan/doubleselect.go +mod doubleselect { + use super::*; + + #[cfg(miri)] + const ITERATIONS: i32 = 100; + #[cfg(not(miri))] + const ITERATIONS: i32 = 10_000; + + fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) { + defer! { c1.close_s() } + defer! { c2.close_s() } + defer! { c3.close_s() } + defer! { c4.close_s() } + + for i in 0..n { + select! { + send(c1.tx(), i) -> _ => {} + send(c2.tx(), i) -> _ => {} + send(c3.tx(), i) -> _ => {} + send(c4.tx(), i) -> _ => {} + } + } + } + + fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) { + for v in inp { + out.send(v); + } + done.send(true); + } + + fn recver(inp: Chan<i32>) { + let mut seen = HashMap::new(); + + for v in &inp { + if seen.contains_key(&v) { + panic!("got duplicate value for {}", v); + } + seen.insert(v, true); + } + } + + #[test] + fn main() { + let c1 = make::<i32>(0); + let c2 = make::<i32>(0); + let c3 = make::<i32>(0); + let c4 = make::<i32>(0); + let done = make::<bool>(0); + let cmux = make::<i32>(0); + + go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4)); + go!(cmux, c1, done, mux(cmux, c1, done)); + go!(cmux, c2, done, mux(cmux, c2, done)); + go!(cmux, c3, done, mux(cmux, c3, done)); + go!(cmux, c4, done, mux(cmux, c4, done)); + go!(done, cmux, { + done.recv(); + done.recv(); + done.recv(); + done.recv(); + cmux.close_s(); + }); + recver(cmux); + } +} + +// https://github.com/golang/go/blob/master/test/chan/fifo.go +mod fifo { + use super::*; + + const N: i32 = 10; + + #[test] + fn asynch_fifo() { + let ch = make::<i32>(N as usize); + for i in 0..N { + ch.send(i); + } + for i in 0..N { + if ch.recv() != Some(i) { + panic!("bad receive"); + } + } + } + + fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) { + inp.recv(); + if ch.recv() != Some(val) { + panic!("{}", val); + } + out.send(1); + } + + #[test] + fn synch_fifo() { + let ch = make::<i32>(0); + let mut inp = make::<i32>(0); + let start = inp.clone(); + + for i in 0..N { + let out = make::<i32>(0); + go!(ch, i, inp, out, chain(ch, i, inp, out)); + inp = out; + } + + start.send(0); + for i in 0..N { + ch.send(i); + } + inp.recv(); + } +} + +// https://github.com/golang/go/blob/master/test/chan/goroutines.go +mod goroutines { + use super::*; + + fn f(left: Chan<i32>, right: Chan<i32>) { + left.send(right.recv().unwrap()); + } + + #[test] + fn main() { + let n = 100i32; + + let leftmost = make::<i32>(0); + let mut right = leftmost.clone(); + let mut left = leftmost.clone(); + + for _ in 0..n { + right = make::<i32>(0); + go!(left, right, f(left, right)); + left = right.clone(); + } + + go!(right, right.send(1)); + leftmost.recv().unwrap(); + } +} + +// https://github.com/golang/go/blob/master/test/chan/nonblock.go +mod nonblock { + use super::*; + + fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) { + if c.recv().unwrap() != 123 { + panic!("i32 value"); + } + strobe.send(true); + } + + fn i32sender(c: Chan<i32>, strobe: Chan<bool>) { + c.send(234); + strobe.send(true); + } + + fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) { + if c.recv().unwrap() != 123456 { + panic!("i64 value"); + } + strobe.send(true); + } + + fn i64sender(c: Chan<i64>, strobe: Chan<bool>) { + c.send(234567); + strobe.send(true); + } + + fn breceiver(c: Chan<bool>, strobe: Chan<bool>) { + if !c.recv().unwrap() { + panic!("b value"); + } + strobe.send(true); + } + + fn bsender(c: Chan<bool>, strobe: Chan<bool>) { + c.send(true); + strobe.send(true); + } + + fn sreceiver(c: Chan<String>, strobe: Chan<bool>) { + if c.recv().unwrap() != "hello" { + panic!("x value"); + } + strobe.send(true); + } + + fn ssender(c: Chan<String>, strobe: Chan<bool>) { + c.send("hello again".to_string()); + strobe.send(true); + } + + const MAX_TRIES: usize = 10000; // Up to 100ms per test. + + #[test] + fn main() { + let ticker = tick(Duration::new(0, 10_000)); // 10 us + let sleep = || { + ticker.recv().unwrap(); + ticker.recv().unwrap(); + thread::yield_now(); + thread::yield_now(); + thread::yield_now(); + }; + + let sync = make::<bool>(0); + + for buffer in 0..2 { + let c32 = make::<i32>(buffer); + let c64 = make::<i64>(buffer); + let cb = make::<bool>(buffer); + let cs = make::<String>(buffer); + + select! { + recv(c32.rx()) -> _ => panic!("blocked i32sender"), + default => {} + } + + select! { + recv(c64.rx()) -> _ => panic!("blocked i64sender"), + default => {} + } + + select! { + recv(cb.rx()) -> _ => panic!("blocked bsender"), + default => {} + } + + select! { + recv(cs.rx()) -> _ => panic!("blocked ssender"), + default => {} + } + + go!(c32, sync, i32receiver(c32, sync)); + let mut r#try = 0; + loop { + select! { + send(c32.tx(), 123) -> _ => break, + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("i32receiver buffer={}", buffer); + panic!("fail") + } + sleep(); + } + } + } + sync.recv(); + go!(c32, sync, i32sender(c32, sync)); + if buffer > 0 { + sync.recv(); + } + let mut r#try = 0; + loop { + select! { + recv(c32.rx()) -> v => { + if v != Ok(234) { + panic!("i32sender value"); + } + break; + } + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("i32sender buffer={}", buffer); + panic!("fail"); + } + sleep(); + } + } + } + if buffer == 0 { + sync.recv(); + } + + go!(c64, sync, i64receiver(c64, sync)); + let mut r#try = 0; + loop { + select! { + send(c64.tx(), 123456) -> _ => break, + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("i64receiver buffer={}", buffer); + panic!("fail") + } + sleep(); + } + } + } + sync.recv(); + go!(c64, sync, i64sender(c64, sync)); + if buffer > 0 { + sync.recv(); + } + let mut r#try = 0; + loop { + select! { + recv(c64.rx()) -> v => { + if v != Ok(234567) { + panic!("i64sender value"); + } + break; + } + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("i64sender buffer={}", buffer); + panic!("fail"); + } + sleep(); + } + } + } + if buffer == 0 { + sync.recv(); + } + + go!(cb, sync, breceiver(cb, sync)); + let mut r#try = 0; + loop { + select! { + send(cb.tx(), true) -> _ => break, + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("breceiver buffer={}", buffer); + panic!("fail") + } + sleep(); + } + } + } + sync.recv(); + go!(cb, sync, bsender(cb, sync)); + if buffer > 0 { + sync.recv(); + } + let mut r#try = 0; + loop { + select! { + recv(cb.rx()) -> v => { + if v != Ok(true) { + panic!("bsender value"); + } + break; + } + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("bsender buffer={}", buffer); + panic!("fail"); + } + sleep(); + } + } + } + if buffer == 0 { + sync.recv(); + } + + go!(cs, sync, sreceiver(cs, sync)); + let mut r#try = 0; + loop { + select! { + send(cs.tx(), "hello".to_string()) -> _ => break, + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("sreceiver buffer={}", buffer); + panic!("fail") + } + sleep(); + } + } + } + sync.recv(); + go!(cs, sync, ssender(cs, sync)); + if buffer > 0 { + sync.recv(); + } + let mut r#try = 0; + loop { + select! { + recv(cs.rx()) -> v => { + if v != Ok("hello again".to_string()) { + panic!("ssender value"); + } + break; + } + default => { + r#try += 1; + if r#try > MAX_TRIES { + println!("ssender buffer={}", buffer); + panic!("fail"); + } + sleep(); + } + } + } + if buffer == 0 { + sync.recv(); + } + } + } +} + +// https://github.com/golang/go/blob/master/test/chan/select.go +mod select { + use super::*; + + #[test] + fn main() { + let shift = Cell::new(0); + let counter = Cell::new(0); + + let get_value = || { + counter.set(counter.get() + 1); + 1 << shift.get() + }; + + let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| { + let mut i = 0; + let never = make::<u32>(0); + loop { + let nil1 = never.tx(); + let nil2 = never.tx(); + let v1 = get_value(); + let v2 = get_value(); + select! { + send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => { + i += 1; + a = None; + } + send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => { + i += 1; + b = None; + } + default => break, + } + shift.set(shift.get() + 1); + } + i + }; + + let a = make::<u32>(1); + let b = make::<u32>(1); + + assert_eq!(send(Some(&a), Some(&b)), 2); + + let av = a.recv().unwrap(); + let bv = b.recv().unwrap(); + assert_eq!(av | bv, 3); + + assert_eq!(send(Some(&a), None), 1); + assert_eq!(counter.get(), 10); + } +} + +// https://github.com/golang/go/blob/master/test/chan/select2.go +mod select2 { + use super::*; + + #[cfg(miri)] + const N: i32 = 200; + #[cfg(not(miri))] + const N: i32 = 100000; + + #[test] + fn main() { + fn sender(c: &Chan<i32>, n: i32) { + for _ in 0..n { + c.send(1); + } + } + + fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) { + for _ in 0..n { + select! { + recv(c.rx()) -> _ => {} + recv(dummy.rx()) -> _ => { + panic!("dummy"); + } + } + } + } + + let c = make_unbounded::<i32>(); + let dummy = make_unbounded::<i32>(); + + ALLOCATED.store(0, SeqCst); + + go!(c, sender(&c, N)); + receiver(&c, &dummy, N); + + let alloc = ALLOCATED.load(SeqCst); + + go!(c, sender(&c, N)); + receiver(&c, &dummy, N); + + assert!( + !(ALLOCATED.load(SeqCst) > alloc + && (ALLOCATED.load(SeqCst) - alloc) > (N as usize + 10000)) + ) + } +} + +// https://github.com/golang/go/blob/master/test/chan/select3.go +mod select3 { + // TODO +} + +// https://github.com/golang/go/blob/master/test/chan/select4.go +mod select4 { + use super::*; + + #[test] + fn main() { + let c = make::<i32>(1); + let c1 = make::<i32>(0); + c.send(42); + select! { + recv(c1.rx()) -> _ => panic!("BUG"), + recv(c.rx()) -> v => assert_eq!(v, Ok(42)), + } + } +} + +// https://github.com/golang/go/blob/master/test/chan/select6.go +mod select6 { + use super::*; + + #[test] + fn main() { + let c1 = make::<bool>(0); + let c2 = make::<bool>(0); + let c3 = make::<bool>(0); + + go!(c1, c1.recv()); + go!(c1, c2, c3, { + select! { + recv(c1.rx()) -> _ => panic!("dummy"), + recv(c2.rx()) -> _ => c3.send(true), + } + c1.recv(); + }); + go!(c2, c2.send(true)); + + c3.recv(); + c1.send(true); + c1.send(true); + } +} + +// https://github.com/golang/go/blob/master/test/chan/select7.go +mod select7 { + use super::*; + + fn recv1(c: Chan<i32>) { + c.recv().unwrap(); + } + + fn recv2(c: Chan<i32>) { + select! { + recv(c.rx()) -> _ => () + } + } + + fn recv3(c: Chan<i32>) { + let c2 = make::<i32>(1); + select! { + recv(c.rx()) -> _ => (), + recv(c2.rx()) -> _ => () + } + } + + fn send1(recv: fn(Chan<i32>)) { + let c = make::<i32>(1); + go!(c, recv(c)); + thread::yield_now(); + c.send(1); + } + + fn send2(recv: fn(Chan<i32>)) { + let c = make::<i32>(1); + go!(c, recv(c)); + thread::yield_now(); + select! { + send(c.tx(), 1) -> _ => () + } + } + + fn send3(recv: fn(Chan<i32>)) { + let c = make::<i32>(1); + go!(c, recv(c)); + thread::yield_now(); + let c2 = make::<i32>(1); + select! { + send(c.tx(), 1) -> _ => (), + send(c2.tx(), 1) -> _ => () + } + } + + #[test] + fn main() { + send1(recv1); + send2(recv1); + send3(recv1); + send1(recv2); + send2(recv2); + send3(recv2); + send1(recv3); + send2(recv3); + send3(recv3); + } +} + +// https://github.com/golang/go/blob/master/test/chan/sieve1.go +mod sieve1 { + use super::*; + + fn generate(ch: Chan<i32>) { + let mut i = 2; + loop { + ch.send(i); + i += 1; + } + } + + fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) { + for i in in_ch { + if i % prime != 0 { + out_ch.send(i); + } + } + } + + fn sieve(primes: Chan<i32>) { + let mut ch = make::<i32>(1); + go!(ch, generate(ch)); + loop { + let prime = ch.recv().unwrap(); + primes.send(prime); + + let ch1 = make::<i32>(1); + go!(ch, ch1, prime, filter(ch, ch1, prime)); + ch = ch1; + } + } + + #[test] + fn main() { + let primes = make::<i32>(1); + go!(primes, sieve(primes)); + + let a = [ + 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, + 89, 97, + ]; + #[cfg(miri)] + let a = &a[..10]; + + for item in a.iter() { + let x = primes.recv().unwrap(); + if x != *item { + println!("{} != {}", x, item); + panic!("fail"); + } + } + } +} + +// https://github.com/golang/go/blob/master/test/chan/zerosize.go +mod zerosize { + use super::*; + + #[test] + fn zero_size_struct() { + struct ZeroSize; + let _ = make::<ZeroSize>(0); + } + + #[test] + fn zero_size_array() { + let _ = make::<[u8; 0]>(0); + } +} + +// https://github.com/golang/go/blob/master/src/runtime/chan_test.go +mod chan_test { + use super::*; + + #[test] + fn test_chan() { + #[cfg(miri)] + const N: i32 = 12; + #[cfg(not(miri))] + const N: i32 = 200; + + #[cfg(miri)] + const MESSAGES_COUNT: i32 = 20; + #[cfg(not(miri))] + const MESSAGES_COUNT: i32 = 100; + + for cap in 0..N { + { + // Ensure that receive from empty chan blocks. + let c = make::<i32>(cap as usize); + + let recv1 = Arc::new(Mutex::new(false)); + go!(c, recv1, { + c.recv(); + *recv1.lock().unwrap() = true; + }); + + let recv2 = Arc::new(Mutex::new(false)); + go!(c, recv2, { + c.recv(); + *recv2.lock().unwrap() = true; + }); + + thread::sleep(ms(1)); + + if *recv1.lock().unwrap() || *recv2.lock().unwrap() { + panic!(); + } + + // Ensure that non-blocking receive does not block. + select! { + recv(c.rx()) -> _ => panic!(), + default => {} + } + select! { + recv(c.rx()) -> _ => panic!(), + default => {} + } + + c.send(0); + c.send(0); + } + + { + // Ensure that send to full chan blocks. + let c = make::<i32>(cap as usize); + for i in 0..cap { + c.send(i); + } + + let sent = Arc::new(Mutex::new(0)); + go!(sent, c, { + c.send(0); + *sent.lock().unwrap() = 1; + }); + + thread::sleep(ms(1)); + + if *sent.lock().unwrap() != 0 { + panic!(); + } + + // Ensure that non-blocking send does not block. + select! { + send(c.tx(), 0) -> _ => panic!(), + default => {} + } + c.recv(); + } + + { + // Ensure that we receive 0 from closed chan. + let c = make::<i32>(cap as usize); + for i in 0..cap { + c.send(i); + } + c.close_s(); + + for i in 0..cap { + let v = c.recv(); + if v != Some(i) { + panic!(); + } + } + + if c.recv() != None { + panic!(); + } + if c.try_recv() != None { + panic!(); + } + } + + { + // Ensure that close unblocks receive. + let c = make::<i32>(cap as usize); + let done = make::<bool>(0); + + go!(c, done, { + let v = c.try_recv(); + done.send(v.is_none()); + }); + + thread::sleep(ms(1)); + c.close_s(); + + if !done.recv().unwrap() { + panic!(); + } + } + + { + // Send many integers, + // ensure that we receive them non-corrupted in FIFO order. + let c = make::<i32>(cap as usize); + go!(c, { + for i in 0..MESSAGES_COUNT { + c.send(i); + } + }); + for i in 0..MESSAGES_COUNT { + if c.recv() != Some(i) { + panic!(); + } + } + + // Same, but using recv2. + go!(c, { + for i in 0..MESSAGES_COUNT { + c.send(i); + } + }); + for i in 0..MESSAGES_COUNT { + if c.recv() != Some(i) { + panic!(); + } + } + } + } + } + + #[test] + fn test_nonblock_recv_race() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + + for _ in 0..N { + let c = make::<i32>(1); + c.send(1); + + let t = go!(c, { + select! { + recv(c.rx()) -> _ => {} + default => panic!("chan is not ready"), + } + }); + + c.close_s(); + c.recv(); + t.join().unwrap(); + } + } + + #[test] + fn test_nonblock_select_race() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + + let done = make::<bool>(1); + for _ in 0..N { + let c1 = make::<i32>(1); + let c2 = make::<i32>(1); + c1.send(1); + + go!(c1, c2, done, { + select! { + recv(c1.rx()) -> _ => {} + recv(c2.rx()) -> _ => {} + default => { + done.send(false); + return; + } + } + done.send(true); + }); + + c2.send(1); + select! { + recv(c1.rx()) -> _ => {} + default => {} + } + if !done.recv().unwrap() { + panic!("no chan is ready"); + } + } + } + + #[test] + fn test_nonblock_select_race2() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + + let done = make::<bool>(1); + for _ in 0..N { + let c1 = make::<i32>(1); + let c2 = make::<i32>(0); + c1.send(1); + + go!(c1, c2, done, { + select! { + recv(c1.rx()) -> _ => {} + recv(c2.rx()) -> _ => {} + default => { + done.send(false); + return; + } + } + done.send(true); + }); + + c2.close_s(); + select! { + recv(c1.rx()) -> _ => {} + default => {} + } + if !done.recv().unwrap() { + panic!("no chan is ready"); + } + } + } + + #[test] + fn test_self_select() { + // Ensure that send/recv on the same chan in select + // does not crash nor deadlock. + + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 1000; + + for &cap in &[0, 10] { + let wg = WaitGroup::new(); + wg.add(2); + let c = make::<i32>(cap); + + for p in 0..2 { + let p = p; + go!(wg, p, c, { + defer! { wg.done() } + for i in 0..N { + if p == 0 || i % 2 == 0 { + select! { + send(c.tx(), p) -> _ => {} + recv(c.rx()) -> v => { + if cap == 0 && v.ok() == Some(p) { + panic!("self receive"); + } + } + } + } else { + select! { + recv(c.rx()) -> v => { + if cap == 0 && v.ok() == Some(p) { + panic!("self receive"); + } + } + send(c.tx(), p) -> _ => {} + } + } + } + }); + } + wg.wait(); + } + } + + #[test] + fn test_select_stress() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + + let c = vec![ + make::<i32>(0), + make::<i32>(0), + make::<i32>(2), + make::<i32>(3), + ]; + + // There are 4 goroutines that send N values on each of the chans, + // + 4 goroutines that receive N values on each of the chans, + // + 1 goroutine that sends N values on each of the chans in a single select, + // + 1 goroutine that receives N values on each of the chans in a single select. + // All these sends, receives and selects interact chaotically at runtime, + // but we are careful that this whole construct does not deadlock. + let wg = WaitGroup::new(); + wg.add(10); + + for k in 0..4 { + go!(k, c, wg, { + for _ in 0..N { + c[k].send(0); + } + wg.done(); + }); + go!(k, c, wg, { + for _ in 0..N { + c[k].recv(); + } + wg.done(); + }); + } + + go!(c, wg, { + let mut n = [0; 4]; + let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>(); + + for _ in 0..4 * N { + let index = { + let mut sel = Select::new(); + let mut opers = [!0; 4]; + for &i in &[3, 2, 0, 1] { + if let Some(c) = &c1[i] { + opers[i] = sel.recv(c); + } + } + + let oper = sel.select(); + let mut index = !0; + for i in 0..4 { + if opers[i] == oper.index() { + index = i; + let _ = oper.recv(c1[i].as_ref().unwrap()); + break; + } + } + index + }; + + n[index] += 1; + if n[index] == N { + c1[index] = None; + } + } + wg.done(); + }); + + go!(c, wg, { + let mut n = [0; 4]; + let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>(); + + for _ in 0..4 * N { + let index = { + let mut sel = Select::new(); + let mut opers = [!0; 4]; + for &i in &[0, 1, 2, 3] { + if let Some(c) = &c1[i] { + opers[i] = sel.send(c); + } + } + + let oper = sel.select(); + let mut index = !0; + for i in 0..4 { + if opers[i] == oper.index() { + index = i; + let _ = oper.send(c1[i].as_ref().unwrap(), 0); + break; + } + } + index + }; + + n[index] += 1; + if n[index] == N { + c1[index] = None; + } + } + wg.done(); + }); + + wg.wait(); + } + + #[test] + fn test_select_fairness() { + #[cfg(miri)] + const TRIALS: usize = 100; + #[cfg(not(miri))] + const TRIALS: usize = 10000; + + let c1 = make::<u8>(TRIALS + 1); + let c2 = make::<u8>(TRIALS + 1); + + for _ in 0..TRIALS + 1 { + c1.send(1); + c2.send(2); + } + + let c3 = make::<u8>(0); + let c4 = make::<u8>(0); + let out = make::<u8>(0); + let done = make::<u8>(0); + let wg = WaitGroup::new(); + + wg.add(1); + go!(wg, c1, c2, c3, c4, out, done, { + defer! { wg.done() }; + loop { + let b; + select! { + recv(c3.rx()) -> m => b = m.unwrap(), + recv(c4.rx()) -> m => b = m.unwrap(), + recv(c1.rx()) -> m => b = m.unwrap(), + recv(c2.rx()) -> m => b = m.unwrap(), + } + select! { + send(out.tx(), b) -> _ => {} + recv(done.rx()) -> _ => return, + } + } + }); + + let (mut cnt1, mut cnt2) = (0, 0); + for _ in 0..TRIALS { + match out.recv() { + Some(1) => cnt1 += 1, + Some(2) => cnt2 += 1, + b => panic!("unexpected value {:?} on channel", b), + } + } + + // If the select in the goroutine is fair, + // cnt1 and cnt2 should be about the same value. + // With 10,000 trials, the expected margin of error at + // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)). + + let r = cnt1 as f64 / TRIALS as f64; + let e = (r - 0.5).abs(); + + if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) { + panic!( + "unfair select: in {} trials, results were {}, {}", + TRIALS, cnt1, cnt2, + ); + } + + done.close_s(); + wg.wait(); + } + + #[test] + fn test_chan_send_interface() { + struct Mt; + + let c = make::<Box<dyn Any>>(1); + c.send(Box::new(Mt)); + + select! { + send(c.tx(), Box::new(Mt)) -> _ => {} + default => {} + } + + select! { + send(c.tx(), Box::new(Mt)) -> _ => {} + send(c.tx(), Box::new(Mt)) -> _ => {} + default => {} + } + } + + #[test] + fn test_pseudo_random_send() { + #[cfg(miri)] + const N: usize = 20; + #[cfg(not(miri))] + const N: usize = 100; + + for cap in 0..N { + let c = make::<i32>(cap); + let l = Arc::new(Mutex::new(vec![0i32; N])); + let done = make::<bool>(0); + + go!(c, done, l, { + let mut l = l.lock().unwrap(); + for i in 0..N { + thread::yield_now(); + l[i] = c.recv().unwrap(); + } + done.send(true); + }); + + for _ in 0..N { + select! { + send(c.tx(), 1) -> _ => {} + send(c.tx(), 0) -> _ => {} + } + } + done.recv(); + + let mut n0 = 0; + let mut n1 = 0; + for &i in l.lock().unwrap().iter() { + n0 += (i + 1) % 2; + n1 += i; + } + + if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 { + panic!( + "Want pseudorandom, got {} zeros and {} ones (chan cap {})", + n0, n1, cap, + ); + } + } + } + + #[test] + fn test_multi_consumer() { + const NWORK: usize = 23; + #[cfg(miri)] + const NITER: usize = 50; + #[cfg(not(miri))] + const NITER: usize = 271828; + + let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31]; + + let q = make::<i32>(NWORK * 3); + let r = make::<i32>(NWORK * 3); + + let wg = WaitGroup::new(); + for i in 0..NWORK { + wg.add(1); + let w = i; + go!(q, r, wg, pn, { + for v in &q { + if pn[w % pn.len()] == v { + thread::yield_now(); + } + r.send(v); + } + wg.done(); + }); + } + + let expect = Arc::new(Mutex::new(0)); + go!(q, r, expect, wg, pn, { + for i in 0..NITER { + let v = pn[i % pn.len()]; + *expect.lock().unwrap() += v; + q.send(v); + } + q.close_s(); + wg.wait(); + r.close_s(); + }); + + let mut n = 0; + let mut s = 0; + for v in &r { + n += 1; + s += v; + } + + if n != NITER || s != *expect.lock().unwrap() { + panic!(); + } + } + + #[test] + fn test_select_duplicate_channel() { + // This test makes sure we can queue a G on + // the same channel multiple times. + let c = make::<i32>(0); + let d = make::<i32>(0); + let e = make::<i32>(0); + + go!(c, d, e, { + select! { + recv(c.rx()) -> _ => {} + recv(d.rx()) -> _ => {} + recv(e.rx()) -> _ => {} + } + e.send(9); + }); + thread::sleep(ms(1)); + + go!(c, c.recv()); + thread::sleep(ms(1)); + + d.send(7); + e.recv(); + c.send(8); + } +} + +// https://github.com/golang/go/blob/master/test/closedchan.go +mod closedchan { + // TODO +} + +// https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go +mod chanbarrier_test { + // TODO +} + +// https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go +mod race_chan_test { + // TODO +} + +// https://github.com/golang/go/blob/master/test/ken/chan.go +mod chan { + use super::*; + + const MESSAGES_PER_CHANEL: u32 = 76; + const MESSAGES_RANGE_LEN: u32 = 100; + const END: i32 = 10000; + + struct ChanWithVals { + chan: Chan<i32>, + /// Next value to send + sv: Arc<AtomicI32>, + /// Next value to receive + rv: Arc<AtomicI32>, + } + + struct Totals { + /// Total sent messages + tots: u32, + /// Total received messages + totr: u32, + } + + struct Context { + nproc: Arc<Mutex<i32>>, + cval: Arc<Mutex<i32>>, + tot: Arc<Mutex<Totals>>, + nc: ChanWithVals, + randx: Arc<Mutex<i32>>, + } + + impl ChanWithVals { + fn with_capacity(capacity: usize) -> Self { + ChanWithVals { + chan: make(capacity), + sv: Arc::new(AtomicI32::new(0)), + rv: Arc::new(AtomicI32::new(0)), + } + } + + fn closed() -> Self { + let ch = ChanWithVals::with_capacity(0); + ch.chan.close_r(); + ch.chan.close_s(); + ch + } + + fn rv(&self) -> i32 { + self.rv.load(SeqCst) + } + + fn sv(&self) -> i32 { + self.sv.load(SeqCst) + } + + fn send(&mut self, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.tots += 1 + } + let esv = expect(self.sv(), self.sv()); + self.sv.store(esv, SeqCst); + if self.sv() == END { + self.chan.close_s(); + return true; + } + false + } + + fn recv(&mut self, v: i32, tot: &Mutex<Totals>) -> bool { + { + let mut tot = tot.lock().unwrap(); + tot.totr += 1 + } + let erv = expect(self.rv(), v); + self.rv.store(erv, SeqCst); + if self.rv() == END { + self.chan.close_r(); + return true; + } + false + } + } + + impl Clone for ChanWithVals { + fn clone(&self) -> Self { + ChanWithVals { + chan: self.chan.clone(), + sv: self.sv.clone(), + rv: self.rv.clone(), + } + } + } + + impl Context { + fn nproc(&self) -> &Mutex<i32> { + self.nproc.as_ref() + } + + fn cval(&self) -> &Mutex<i32> { + self.cval.as_ref() + } + + fn tot(&self) -> &Mutex<Totals> { + self.tot.as_ref() + } + + fn randx(&self) -> &Mutex<i32> { + self.randx.as_ref() + } + } + + impl Clone for Context { + fn clone(&self) -> Self { + Context { + nproc: self.nproc.clone(), + cval: self.cval.clone(), + tot: self.tot.clone(), + nc: self.nc.clone(), + randx: self.randx.clone(), + } + } + } + + fn nrand(n: i32, randx: &Mutex<i32>) -> i32 { + let mut randx = randx.lock().unwrap(); + *randx += 10007; + if *randx >= 1000000 { + *randx -= 1000000 + } + *randx % n + } + + fn change_nproc(adjust: i32, nproc: &Mutex<i32>) -> i32 { + let mut nproc = nproc.lock().unwrap(); + *nproc += adjust; + *nproc + } + + fn mkchan(c: usize, n: usize, cval: &Mutex<i32>) -> Vec<ChanWithVals> { + let mut ca = Vec::<ChanWithVals>::with_capacity(n); + let mut cval = cval.lock().unwrap(); + for _ in 0..n { + *cval += MESSAGES_RANGE_LEN as i32; + let chl = ChanWithVals::with_capacity(c); + chl.sv.store(*cval, SeqCst); + chl.rv.store(*cval, SeqCst); + ca.push(chl); + } + ca + } + + fn expect(v: i32, v0: i32) -> i32 { + if v == v0 { + return if v % MESSAGES_RANGE_LEN as i32 == MESSAGES_PER_CHANEL as i32 - 1 { + END + } else { + v + 1 + }; + } + panic!("got {}, expected {}", v, v0 + 1); + } + + fn send(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in 0..=nrand(10, ctx.randx()) { + thread::yield_now(); + } + c.chan.tx().send(c.sv()).unwrap(); + if c.send(ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn recv(mut c: ChanWithVals, ctx: Context) { + loop { + for _ in (0..nrand(10, ctx.randx())).rev() { + thread::yield_now(); + } + let v = c.chan.rx().recv().unwrap(); + if c.recv(v, ctx.tot()) { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + #[allow(clippy::too_many_arguments)] + fn sel( + mut r0: ChanWithVals, + mut r1: ChanWithVals, + mut r2: ChanWithVals, + mut r3: ChanWithVals, + mut s0: ChanWithVals, + mut s1: ChanWithVals, + mut s2: ChanWithVals, + mut s3: ChanWithVals, + ctx: Context, + ) { + let mut a = 0; // local chans running + + if r0.chan.has_rx() { + a += 1; + } + if r1.chan.has_rx() { + a += 1; + } + if r2.chan.has_rx() { + a += 1; + } + if r3.chan.has_rx() { + a += 1; + } + if s0.chan.has_tx() { + a += 1; + } + if s1.chan.has_tx() { + a += 1; + } + if s2.chan.has_tx() { + a += 1; + } + if s3.chan.has_tx() { + a += 1; + } + + loop { + for _ in 0..=nrand(5, ctx.randx()) { + thread::yield_now(); + } + select! { + recv(r0.chan.rx()) -> v => if r0.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r1.chan.rx()) -> v => if r1.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r2.chan.rx()) -> v => if r2.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + recv(r3.chan.rx()) -> v => if r3.recv(v.unwrap(), ctx.tot()) { a -= 1 }, + send(s0.chan.tx(), s0.sv()) -> _ => if s0.send(ctx.tot()) { a -= 1 }, + send(s1.chan.tx(), s1.sv()) -> _ => if s1.send(ctx.tot()) { a -= 1 }, + send(s2.chan.tx(), s2.sv()) -> _ => if s2.send(ctx.tot()) { a -= 1 }, + send(s3.chan.tx(), s3.sv()) -> _ => if s3.send(ctx.tot()) { a -= 1 }, + } + if a == 0 { + break; + } + } + change_nproc(-1, ctx.nproc()); + } + + fn get(vec: &[ChanWithVals], idx: usize) -> ChanWithVals { + vec.get(idx).unwrap().clone() + } + + /// Direct send to direct recv + fn test1(c: ChanWithVals, ctx: &mut Context) { + change_nproc(2, ctx.nproc()); + go!(c, ctx, send(c, ctx)); + go!(c, ctx, recv(c, ctx)); + } + + /// Direct send to select recv + fn test2(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 0), ctx)); + go!(ca, ctx, send(get(&ca, 1), ctx)); + go!(ca, ctx, send(get(&ca, 2), ctx)); + go!(ca, ctx, send(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to direct recv + fn test3(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 0), ctx)); + go!(ca, ctx, recv(get(&ca, 1), ctx)); + go!(ca, ctx, recv(get(&ca, 2), ctx)); + go!(ca, ctx, recv(get(&ca, 3), ctx)); + + change_nproc(1, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + } + + /// Select send to select recv, 4 channels + fn test4(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx.nc.clone(), + ctx, + ) + ); + } + + /// Select send to select recv, 8 channels + fn test5(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 8, ctx.cval()); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + ctx, + ) + ); + } + + // Direct and select send to direct and select recv + fn test6(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 12, ctx.cval()); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, send(get(&ca, 4), ctx)); + go!(ca, ctx, send(get(&ca, 5), ctx)); + go!(ca, ctx, send(get(&ca, 6), ctx)); + go!(ca, ctx, send(get(&ca, 7), ctx)); + + change_nproc(4, ctx.nproc()); + go!(ca, ctx, recv(get(&ca, 8), ctx)); + go!(ca, ctx, recv(get(&ca, 9), ctx)); + go!(ca, ctx, recv(get(&ca, 10), ctx)); + go!(ca, ctx, recv(get(&ca, 11), ctx)); + + change_nproc(2, ctx.nproc()); + go!( + ca, + ctx, + sel( + get(&ca, 4), + get(&ca, 5), + get(&ca, 6), + get(&ca, 7), + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + ctx, + ) + ); + go!( + ca, + ctx, + sel( + get(&ca, 0), + get(&ca, 1), + get(&ca, 2), + get(&ca, 3), + get(&ca, 8), + get(&ca, 9), + get(&ca, 10), + get(&ca, 11), + ctx, + ) + ); + } + + fn wait(ctx: &mut Context) { + thread::yield_now(); + while change_nproc(0, ctx.nproc()) != 0 { + thread::yield_now(); + } + } + + fn tests(c: usize, ctx: &mut Context) { + let ca = mkchan(c, 4, ctx.cval()); + test1(get(&ca, 0), ctx); + test1(get(&ca, 1), ctx); + test1(get(&ca, 2), ctx); + test1(get(&ca, 3), ctx); + wait(ctx); + + test2(c, ctx); + wait(ctx); + + test3(c, ctx); + wait(ctx); + + test4(c, ctx); + wait(ctx); + + test5(c, ctx); + wait(ctx); + + test6(c, ctx); + wait(ctx); + } + + #[test] + #[cfg_attr(miri, ignore)] // Miri is too slow + fn main() { + let mut ctx = Context { + nproc: Arc::new(Mutex::new(0)), + cval: Arc::new(Mutex::new(0)), + tot: Arc::new(Mutex::new(Totals { tots: 0, totr: 0 })), + nc: ChanWithVals::closed(), + randx: Arc::new(Mutex::new(0)), + }; + + tests(0, &mut ctx); + tests(1, &mut ctx); + tests(10, &mut ctx); + tests(100, &mut ctx); + + #[rustfmt::skip] + let t = 4 * // buffer sizes + (4*4 + // tests 1,2,3,4 channels + 8 + // test 5 channels + 12) * // test 6 channels + MESSAGES_PER_CHANEL; // sends/recvs on a channel + + let tot = ctx.tot.lock().unwrap(); + if tot.tots != t || tot.totr != t { + panic!("tots={} totr={} sb={}", tot.tots, tot.totr, t); + } + } +} + +// https://github.com/golang/go/blob/master/test/ken/chan1.go +mod chan1 { + use super::*; + + // sent messages + #[cfg(miri)] + const N: usize = 20; + #[cfg(not(miri))] + const N: usize = 1000; + // receiving "goroutines" + const M: usize = 10; + // channel buffering + const W: usize = 2; + + fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) { + loop { + select! { + recv(c.rx()) -> rr => { + let r = rr.unwrap(); + let mut data = h.lock().unwrap(); + if data[r] != 1 { + println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]); + panic!("fail") + } + data[r] = 2; + } + } + } + } + + fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) { + for n in 0..N { + let r = n; + let mut data = h.lock().unwrap(); + if data[r] != 0 { + println!("s"); + panic!("fail"); + } + data[r] = 1; + // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094 + drop(data); + c.send(r); + } + } + + #[test] + fn main() { + let h = Arc::new(Mutex::new([0usize; N])); + let c = make::<usize>(W); + for m in 0..M { + go!(c, h, { + r(c, m, h); + }); + thread::yield_now(); + } + thread::yield_now(); + thread::yield_now(); + s(c, h); + } +} diff --git a/third_party/rust/crossbeam-channel/tests/iter.rs b/third_party/rust/crossbeam-channel/tests/iter.rs new file mode 100644 index 0000000000..463f3b0436 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/iter.rs @@ -0,0 +1,110 @@ +//! Tests for iteration over receivers. + +use crossbeam_channel::unbounded; +use crossbeam_utils::thread::scope; + +#[test] +fn nested_recv_iter() { + let (s, r) = unbounded::<i32>(); + let (total_s, total_r) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut acc = 0; + for x in r.iter() { + acc += x; + } + total_s.send(acc).unwrap(); + }); + + s.send(3).unwrap(); + s.send(1).unwrap(); + s.send(2).unwrap(); + drop(s); + assert_eq!(total_r.recv().unwrap(), 6); + }) + .unwrap(); +} + +#[test] +fn recv_iter_break() { + let (s, r) = unbounded::<i32>(); + let (count_s, count_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + for x in r.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_s.send(count).unwrap(); + }); + + s.send(2).unwrap(); + s.send(2).unwrap(); + s.send(2).unwrap(); + let _ = s.send(2); + drop(s); + assert_eq!(count_r.recv().unwrap(), 4); + }) + .unwrap(); +} + +#[test] +fn recv_try_iter() { + let (request_s, request_r) = unbounded(); + let (response_s, response_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == 6 { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(2).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn recv_into_iter_owned() { + let mut iter = { + let (s, r) = unbounded::<i32>(); + s.send(1).unwrap(); + s.send(2).unwrap(); + r.into_iter() + }; + + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert!(iter.next().is_none()); +} + +#[test] +fn recv_into_iter_borrowed() { + let (s, r) = unbounded::<i32>(); + s.send(1).unwrap(); + s.send(2).unwrap(); + drop(s); + + let mut iter = (&r).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert!(iter.next().is_none()); +} diff --git a/third_party/rust/crossbeam-channel/tests/list.rs b/third_party/rust/crossbeam-channel/tests/list.rs new file mode 100644 index 0000000000..ebe6f6f85f --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/list.rs @@ -0,0 +1,582 @@ +//! Tests for the list channel flavor. + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{select, unbounded, Receiver}; +use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError}; +use crossbeam_channel::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = unbounded(); + s.try_send(7).unwrap(); + assert_eq!(r.try_recv(), Ok(7)); + + s.send(8).unwrap(); + assert_eq!(r.recv(), Ok(8)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); +} + +#[test] +fn capacity() { + let (s, r) = unbounded::<()>(); + assert_eq!(s.capacity(), None); + assert_eq!(r.capacity(), None); +} + +#[test] +fn len_empty_full() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert!(s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); + + s.send(()).unwrap(); + + assert_eq!(s.len(), 1); + assert!(!s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 1); + assert!(!r.is_empty()); + assert!(!r.is_full()); + + r.recv().unwrap(); + + assert_eq!(s.len(), 0); + assert!(s.is_empty()); + assert!(!s.is_full()); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); +} + +#[test] +#[cfg_attr(miri, ignore)] // this test makes timing assumptions, but Miri is so slow it violates them +fn try_recv() { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + let (s, r) = unbounded(); + for i in 0..COUNT { + assert_eq!(s.try_send(i), Ok(())); + } + + drop(r); + assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777))); +} + +#[test] +fn send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + let (s, r) = unbounded(); + for i in 0..COUNT { + assert_eq!(s.send(i), Ok(())); + } + + drop(r); + assert_eq!(s.send(777), Err(SendError(777))); +} + +#[test] +fn send_timeout() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + let (s, r) = unbounded(); + for i in 0..COUNT { + assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(())); + } + + drop(r); + assert_eq!( + s.send_timeout(777, ms(0)), + Err(SendTimeoutError::Disconnected(777)) + ); +} + +#[test] +fn send_after_disconnect() { + let (s, r) = unbounded(); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(r); + + assert_eq!(s.send(4), Err(SendError(4))); + assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5))); + assert_eq!( + s.send_timeout(6, ms(0)), + Err(SendTimeoutError::Disconnected(6)) + ); +} + +#[test] +fn recv_after_disconnect() { + let (s, r) = unbounded(); + + s.send(1).unwrap(); + s.send(2).unwrap(); + s.send(3).unwrap(); + + drop(s); + + assert_eq!(r.recv(), Ok(1)); + assert_eq!(r.recv(), Ok(2)); + assert_eq!(r.recv(), Ok(3)); + assert_eq!(r.recv(), Err(RecvError)); +} + +#[test] +fn len() { + let (s, r) = unbounded(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + for i in 0..50 { + s.send(i).unwrap(); + assert_eq!(s.len(), i + 1); + } + + for i in 0..50 { + r.recv().unwrap(); + assert_eq!(r.len(), 50 - i - 1); + } + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded::<usize>(); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (request_s, request_r) = unbounded(); + let (response_s, response_r) = unbounded(); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + request_s.send(()).unwrap(); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = unbounded(); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + s.send(i).unwrap(); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] + const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; + + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..RUNS { + let steps = rng.gen_range(0..STEPS); + let additional = rng.gen_range(0..STEPS / 10); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = unbounded::<DropCounter>(); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + for _ in 0..additional { + s.try_send(DropCounter).unwrap(); + } + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional); + } +} + +#[test] +fn linearizable() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = unbounded(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + s.send(0).unwrap(); + r.try_recv().unwrap(); + } + }); + } + }) + .unwrap(); +} + +#[test] +fn fairness() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s, r) = unbounded(); + + for _ in 0..COUNT { + s.send(()).unwrap(); + } + + let mut hits = [0usize; 5]; + for _ in 0..COUNT { + select! { + recv(r) -> _ => hits[0] += 1, + recv(r) -> _ => hits[1] += 1, + recv(r) -> _ => hits[2] += 1, + recv(r) -> _ => hits[3] += 1, + recv(r) -> _ => hits[4] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +} + +#[test] +fn recv_in_send() { + let (s, r) = unbounded(); + s.send(()).unwrap(); + + select! { + send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {} + } +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = unbounded::<T>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = unbounded(); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-channel/tests/mpsc.rs b/third_party/rust/crossbeam-channel/tests/mpsc.rs new file mode 100644 index 0000000000..d7cc8e25f4 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/mpsc.rs @@ -0,0 +1,2129 @@ +//! Tests copied from `std::sync::mpsc`. +//! +//! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but +//! modified to work with `crossbeam-channel` instead. +//! +//! Minor tweaks were needed to make the tests compile: +//! +//! - Replace `box` syntax with `Box::new`. +//! - Replace all uses of `Select` with `select!`. +//! - Change the imports. +//! - Join all spawned threads. +//! - Removed assertion from oneshot_multi_thread_send_close_stress tests. +//! +//! Source: +//! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc +//! +//! Copyright & License: +//! - Copyright 2013-2014 The Rust Project Developers +//! - Apache License, Version 2.0 or MIT license, at your option +//! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT +//! - https://www.rust-lang.org/en-US/legal.html + +#![allow( + clippy::drop_copy, + clippy::match_single_binding, + clippy::redundant_clone +)] + +use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; +use std::sync::mpsc::{SendError, TrySendError}; +use std::thread::JoinHandle; +use std::time::Duration; + +use crossbeam_channel as cc; + +pub struct Sender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> Sender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + inner: self.inner.clone(), + } + } +} + +pub struct SyncSender<T> { + pub inner: cc::Sender<T>, +} + +impl<T> SyncSender<T> { + pub fn send(&self, t: T) -> Result<(), SendError<T>> { + self.inner.send(t).map_err(|cc::SendError(m)| SendError(m)) + } + + pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { + self.inner.try_send(t).map_err(|err| match err { + cc::TrySendError::Full(m) => TrySendError::Full(m), + cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m), + }) + } +} + +impl<T> Clone for SyncSender<T> { + fn clone(&self) -> SyncSender<T> { + SyncSender { + inner: self.inner.clone(), + } + } +} + +pub struct Receiver<T> { + pub inner: cc::Receiver<T>, +} + +impl<T> Receiver<T> { + pub fn try_recv(&self) -> Result<T, TryRecvError> { + self.inner.try_recv().map_err(|err| match err { + cc::TryRecvError::Empty => TryRecvError::Empty, + cc::TryRecvError::Disconnected => TryRecvError::Disconnected, + }) + } + + pub fn recv(&self) -> Result<T, RecvError> { + self.inner.recv().map_err(|_| RecvError) + } + + pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { + self.inner.recv_timeout(timeout).map_err(|err| match err { + cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout, + cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, + }) + } + + pub fn iter(&self) -> Iter<T> { + Iter { inner: self } + } + + pub fn try_iter(&self) -> TryIter<T> { + TryIter { inner: self } + } +} + +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = Iter<'a, T>; + + fn into_iter(self) -> Iter<'a, T> { + self.iter() + } +} + +impl<T> IntoIterator for Receiver<T> { + type Item = T; + type IntoIter = IntoIter<T>; + + fn into_iter(self) -> IntoIter<T> { + IntoIter { inner: self } + } +} + +pub struct TryIter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for TryIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.try_recv().ok() + } +} + +pub struct Iter<'a, T: 'a> { + inner: &'a Receiver<T>, +} + +impl<'a, T> Iterator for Iter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub struct IntoIter<T> { + inner: Receiver<T>, +} + +impl<T> Iterator for IntoIter<T> { + type Item = T; + + fn next(&mut self) -> Option<T> { + self.inner.recv().ok() + } +} + +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (s, r) = cc::unbounded(); + let s = Sender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { + let (s, r) = cc::bounded(bound); + let s = SyncSender { inner: s }; + let r = Receiver { inner: r }; + (s, r) +} + +macro_rules! select { + ( + $($name:pat = $rx:ident.$meth:ident() => $code:expr),+ + ) => ({ + cc::crossbeam_channel_internal! { + $( + $meth(($rx).inner) -> res => { + let $name = res.map_err(|_| ::std::sync::mpsc::RecvError); + $code + } + )+ + } + }) +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::{Duration, Instant}; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = channel::<Box<isize>>(); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn drop_full_shared() { + let (tx, _rx) = channel::<Box<isize>>(); + drop(tx.clone()); + drop(tx.clone()); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(1).is_err()) + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = channel::<i32>(); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = channel::<i32>(); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = channel::<()>(); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10000; + + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..COUNT { + tx.send(1).unwrap(); + } + }); + for _ in 0..COUNT { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().ok().unwrap(); + } + + #[test] + fn stress_shared() { + let amt: u32 = if cfg!(miri) { 100 } else { 10_000 }; + let nthreads: u32 = if cfg!(miri) { 4 } else { 8 }; + let (tx, rx) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..amt * nthreads { + assert_eq!(rx.recv().unwrap(), 1); + } + assert!(rx.try_recv().is_err()); + }); + + let mut ts = Vec::with_capacity(nthreads as usize); + for _ in 0..nthreads { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..amt { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + t.join().ok().unwrap(); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn send_from_outside_runtime() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + tx1.send(()).unwrap(); + for _ in 0..40 { + assert_eq!(rx2.recv().unwrap(), 1); + } + }); + rx1.recv().unwrap(); + let t2 = thread::spawn(move || { + for _ in 0..40 { + tx2.send(1).unwrap(); + } + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn recv_from_outside_runtime() { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + for _ in 0..40 { + assert_eq!(rx.recv().unwrap(), 1); + } + }); + for _ in 0..40 { + tx.send(1).unwrap(); + } + t.join().ok().unwrap(); + } + + #[test] + fn no_runtime() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let t1 = thread::spawn(move || { + assert_eq!(rx1.recv().unwrap(), 1); + tx2.send(2).unwrap(); + }); + let t2 = thread::spawn(move || { + tx1.send(1).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + }); + t1.join().ok().unwrap(); + t2.join().ok().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = channel::<i32>(); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = channel::<i32>(); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = channel::<Box<i32>>(); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = channel::<Box<i32>>(); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = channel::<i32>(); + assert!(tx.send(10).is_ok()); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = channel::<i32>(); + drop(rx); + assert!(tx.send(10).is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = channel::<i32>(); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = channel::<i32>(); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = channel::<i32>(); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = channel::<Box<i32>>(); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<i32>(); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + let t = thread::spawn(move || { + drop(tx); + }); + t.join().unwrap(); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel::<Box<isize>>(); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = channel(); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t2) = recv(rx, 0) { + ts.push(t2); + } + + fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_single_thread_recv_timeout() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + } + + #[test] + fn stress_recv_timeout_two_threads() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + let timeout = Duration::from_millis(100); + + let t = thread::spawn(move || { + for i in 0..stress { + if i % 2 == 0 { + thread::sleep(timeout * 2); + } + tx.send(1usize).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(timeout) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + t.join().unwrap() + } + + #[test] + fn recv_timeout_upgrade() { + let (tx, rx) = channel::<()>(); + let timeout = Duration::from_millis(1); + let _tx_clone = tx.clone(); + + let start = Instant::now(); + assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout)); + assert!(Instant::now() >= start + timeout); + } + + #[test] + fn stress_recv_timeout_shared() { + let (tx, rx) = channel(); + let stress = stress_factor() + 100; + + let mut ts = Vec::with_capacity(stress); + for i in 0..stress { + let tx = tx.clone(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(i as u64 * 10)); + tx.send(1usize).unwrap(); + }); + ts.push(t); + } + + drop(tx); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(n) => { + assert_eq!(n, 1usize); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, stress); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + #[cfg(miri)] + const N: usize = 50; + #[cfg(not(miri))] + const N: usize = 10000; + + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = channel(); + for _ in 0..N { + tx.send(()).unwrap(); + } + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_recv_timeout() { + let (tx, rx) = channel(); + let total = 5; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(()).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(())); + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = channel(); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = channel::<i32>(); + let (total_tx, total_rx) = channel::<i32>(); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = channel::<i32>(); + let (count_tx, count_rx) = channel(); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn test_recv_try_iter() { + let (request_tx, request_rx) = channel(); + let (response_tx, response_rx) = channel(); + + // Request `x`s until we have `6`. + let t = thread::spawn(move || { + let mut count = 0; + loop { + for x in response_rx.try_iter() { + count += x; + if count == 6 { + return count; + } + } + request_tx.send(()).unwrap(); + } + }); + + for _ in request_rx.iter() { + if response_tx.send(2).is_err() { + break; + } + } + + assert_eq!(t.join().unwrap(), 6); + } + + #[test] + fn test_recv_into_iter_owned() { + let mut iter = { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + rx.into_iter() + }; + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert!(iter.next().is_none()); + } + + #[test] + fn test_recv_into_iter_borrowed() { + let (tx, rx) = channel::<i32>(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + let mut iter = (&rx).into_iter(); + assert_eq!(iter.next().unwrap(), 1); + assert_eq!(iter.next().unwrap(), 2); + assert!(iter.next().is_none()); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn issue_32114() { + let (tx, _) = channel(); + let _ = tx.send(123); + assert_eq!(tx.send(123), Err(SendError(123))); + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs +mod sync_channel_tests { + use super::*; + + use std::env; + use std::thread; + use std::time::Duration; + + pub fn stress_factor() -> usize { + match env::var("RUST_TEST_STRESS") { + Ok(val) => val.parse().unwrap(), + Err(..) => 1, + } + } + + #[test] + fn smoke() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn drop_full() { + let (tx, _rx) = sync_channel::<Box<isize>>(1); + tx.send(Box::new(1)).unwrap(); + } + + #[test] + fn smoke_shared() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + let tx = tx.clone(); + tx.send(1).unwrap(); + assert_eq!(rx.recv().unwrap(), 1); + } + + #[test] + fn recv_timeout() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!( + rx.recv_timeout(Duration::from_millis(1)), + Err(RecvTimeoutError::Timeout) + ); + tx.send(1).unwrap(); + assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1)); + } + + #[test] + fn smoke_threads() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + }); + assert_eq!(rx.recv().unwrap(), 1); + t.join().unwrap(); + } + + #[test] + fn smoke_port_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert!(tx.send(1).is_err()); + } + + #[test] + fn smoke_shared_port_gone2() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + let tx2 = tx.clone(); + drop(tx); + assert!(tx2.send(1).is_err()); + } + + #[test] + fn port_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn port_gone_concurrent_shared() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + while tx.send(1).is_ok() && tx2.send(1).is_ok() {} + t.join().unwrap(); + } + + #[test] + fn smoke_chan_gone() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn smoke_chan_gone_shared() { + let (tx, rx) = sync_channel::<()>(0); + let tx2 = tx.clone(); + drop(tx); + drop(tx2); + assert!(rx.recv().is_err()); + } + + #[test] + fn chan_gone_concurrent() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + tx.send(1).unwrap(); + tx.send(1).unwrap(); + }); + while rx.recv().is_ok() {} + t.join().unwrap(); + } + + #[test] + fn stress() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..N { + tx.send(1).unwrap(); + } + }); + for _ in 0..N { + assert_eq!(rx.recv().unwrap(), 1); + } + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_two_threads() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + + let (tx, rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + for _ in 0..N { + tx.send(1).unwrap(); + } + }); + + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(1)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, N); + t.join().unwrap(); + } + + #[test] + fn stress_recv_timeout_shared() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + let mut recv_count = 0; + loop { + match rx.recv_timeout(Duration::from_millis(10)) { + Ok(v) => { + assert_eq!(v, 1); + recv_count += 1; + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => break, + } + } + + assert_eq!(recv_count, AMT * NTHREADS); + assert!(rx.try_recv().is_err()); + + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + + drop(tx); + + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn stress_shared() { + #[cfg(miri)] + const AMT: u32 = 100; + #[cfg(not(miri))] + const AMT: u32 = 1000; + const NTHREADS: u32 = 8; + let (tx, rx) = sync_channel::<i32>(0); + let (dtx, drx) = sync_channel::<()>(0); + + let t = thread::spawn(move || { + for _ in 0..AMT * NTHREADS { + assert_eq!(rx.recv().unwrap(), 1); + } + assert!(rx.try_recv().is_err()); + dtx.send(()).unwrap(); + }); + + let mut ts = Vec::with_capacity(NTHREADS as usize); + for _ in 0..NTHREADS { + let tx = tx.clone(); + let t = thread::spawn(move || { + for _ in 0..AMT { + tx.send(1).unwrap(); + } + }); + ts.push(t); + } + drop(tx); + drx.recv().unwrap(); + for t in ts { + t.join().unwrap(); + } + t.join().unwrap(); + } + + #[test] + fn oneshot_single_thread_close_port_first() { + // Simple test of closing without sending + let (_tx, rx) = sync_channel::<i32>(0); + drop(rx); + } + + #[test] + fn oneshot_single_thread_close_chan_first() { + // Simple test of closing without sending + let (tx, _rx) = sync_channel::<i32>(0); + drop(tx); + } + + #[test] + fn oneshot_single_thread_send_port_close() { + // Testing that the sender cleans up the payload if receiver is closed + let (tx, rx) = sync_channel::<Box<i32>>(0); + drop(rx); + assert!(tx.send(Box::new(0)).is_err()); + } + + #[test] + fn oneshot_single_thread_recv_chan_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.recv(), Err(RecvError)); + } + + #[test] + fn oneshot_single_thread_send_then_recv() { + let (tx, rx) = sync_channel::<Box<i32>>(1); + tx.send(Box::new(10)).unwrap(); + assert!(*rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_open() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(10), Ok(())); + assert!(rx.recv().unwrap() == 10); + } + + #[test] + fn oneshot_single_thread_try_send_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(rx); + assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10))); + } + + #[test] + fn oneshot_single_thread_try_send_closed2() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(10), Err(TrySendError::Full(10))); + } + + #[test] + fn oneshot_single_thread_try_recv_open() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + assert!(rx.recv() == Ok(10)); + } + + #[test] + fn oneshot_single_thread_try_recv_closed() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert!(rx.recv().is_err()); + } + + #[test] + fn oneshot_single_thread_try_recv_closed_with_data() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(10).unwrap(); + drop(tx); + assert_eq!(rx.try_recv(), Ok(10)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_data() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + tx.send(10).unwrap(); + assert_eq!(rx.try_recv(), Ok(10)); + } + + #[test] + fn oneshot_single_thread_peek_close() { + let (tx, rx) = sync_channel::<i32>(0); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } + + #[test] + fn oneshot_single_thread_peek_open() { + let (_tx, rx) = sync_channel::<i32>(0); + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + fn oneshot_multi_task_recv_then_send() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + assert!(*rx.recv().unwrap() == 10); + }); + + tx.send(Box::new(10)).unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_task_recv_then_close() { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + drop(tx); + }); + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + t.join().unwrap(); + } + + #[test] + fn oneshot_multi_thread_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + drop(tx); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + ts.push(t); + thread::spawn(move || { + let _ = tx.send(1); + }) + .join() + .unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_recv_close_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + thread::spawn(move || { + assert_eq!(rx.recv(), Err(RecvError)); + }) + .join() + .unwrap(); + }); + ts.push(t); + let t2 = thread::spawn(move || { + thread::spawn(move || { + drop(tx); + }); + }); + ts.push(t2); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn oneshot_multi_thread_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + let t = thread::spawn(move || { + tx.send(Box::new(10)).unwrap(); + }); + ts.push(t); + assert!(*rx.recv().unwrap() == 10); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn stream_send_recv_stress() { + let stress_factor = stress_factor(); + let mut ts = Vec::with_capacity(2 * stress_factor); + for _ in 0..stress_factor { + let (tx, rx) = sync_channel::<Box<i32>>(0); + + if let Some(t) = send(tx, 0) { + ts.push(t); + } + if let Some(t) = recv(rx, 0) { + ts.push(t); + } + + fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + tx.send(Box::new(i)).unwrap(); + send(tx, i + 1); + })) + } + + fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> { + if i == 10 { + return None; + } + + Some(thread::spawn(move || { + assert!(*rx.recv().unwrap() == i); + recv(rx, i + 1); + })) + } + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn recv_a_lot() { + #[cfg(miri)] + const N: usize = 100; + #[cfg(not(miri))] + const N: usize = 10000; + + // Regression test that we don't run out of stack in scheduler context + let (tx, rx) = sync_channel(N); + for _ in 0..N { + tx.send(()).unwrap(); + } + for _ in 0..N { + rx.recv().unwrap(); + } + } + + #[test] + fn shared_chan_stress() { + let (tx, rx) = sync_channel(0); + let total = stress_factor() + 100; + let mut ts = Vec::with_capacity(total); + for _ in 0..total { + let tx = tx.clone(); + let t = thread::spawn(move || { + tx.send(()).unwrap(); + }); + ts.push(t); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + for t in ts { + t.join().unwrap(); + } + } + + #[test] + fn test_nested_recv_iter() { + let (tx, rx) = sync_channel::<i32>(0); + let (total_tx, total_rx) = sync_channel::<i32>(0); + + let t = thread::spawn(move || { + let mut acc = 0; + for x in rx.iter() { + acc += x; + } + total_tx.send(acc).unwrap(); + }); + + tx.send(3).unwrap(); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + drop(tx); + assert_eq!(total_rx.recv().unwrap(), 6); + t.join().unwrap(); + } + + #[test] + fn test_recv_iter_break() { + let (tx, rx) = sync_channel::<i32>(0); + let (count_tx, count_rx) = sync_channel(0); + + let t = thread::spawn(move || { + let mut count = 0; + for x in rx.iter() { + if count >= 3 { + break; + } else { + count += x; + } + } + count_tx.send(count).unwrap(); + }); + + tx.send(2).unwrap(); + tx.send(2).unwrap(); + tx.send(2).unwrap(); + let _ = tx.try_send(2); + drop(tx); + assert_eq!(count_rx.recv().unwrap(), 4); + t.join().unwrap(); + } + + #[test] + fn try_recv_states() { + let (tx1, rx1) = sync_channel::<i32>(1); + let (tx2, rx2) = sync_channel::<()>(1); + let (tx3, rx3) = sync_channel::<()>(1); + let t = thread::spawn(move || { + rx2.recv().unwrap(); + tx1.send(1).unwrap(); + tx3.send(()).unwrap(); + rx2.recv().unwrap(); + drop(tx1); + tx3.send(()).unwrap(); + }); + + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Ok(1)); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected)); + t.join().unwrap(); + } + + // This bug used to end up in a livelock inside of the Receiver destructor + // because the internal state of the Shared packet was corrupted + #[test] + fn destroy_upgraded_shared_port_when_sender_still_active() { + let (tx, rx) = sync_channel::<()>(0); + let (tx2, rx2) = sync_channel::<()>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); // wait on a oneshot + drop(rx); // destroy a shared + tx2.send(()).unwrap(); + }); + // make sure the other thread has gone to sleep + for _ in 0..5000 { + thread::yield_now(); + } + + // upgrade to a shared chan and send a message + let tx2 = tx.clone(); + drop(tx); + tx2.send(()).unwrap(); + + // wait for the child thread to exit before we exit + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn send1() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + rx.recv().unwrap(); + }); + assert_eq!(tx.send(1), Ok(())); + t.join().unwrap(); + } + + #[test] + fn send2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.send(1), Ok(())); + let t = thread::spawn(move || { + drop(rx); + }); + assert!(tx.send(1).is_err()); + t.join().unwrap(); + } + + #[test] + fn send4() { + let (tx, rx) = sync_channel::<i32>(0); + let tx2 = tx.clone(); + let (done, donerx) = channel(); + let done2 = done.clone(); + let t = thread::spawn(move || { + assert!(tx.send(1).is_err()); + done.send(()).unwrap(); + }); + let t2 = thread::spawn(move || { + assert!(tx2.send(2).is_err()); + done2.send(()).unwrap(); + }); + drop(rx); + donerx.recv().unwrap(); + donerx.recv().unwrap(); + t.join().unwrap(); + t2.join().unwrap(); + } + + #[test] + fn try_send1() { + let (tx, _rx) = sync_channel::<i32>(0); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send2() { + let (tx, _rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + assert_eq!(tx.try_send(1), Err(TrySendError::Full(1))); + } + + #[test] + fn try_send3() { + let (tx, rx) = sync_channel::<i32>(1); + assert_eq!(tx.try_send(1), Ok(())); + drop(rx); + assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1))); + } + + #[test] + fn issue_15761() { + fn repro() { + let (tx1, rx1) = sync_channel::<()>(3); + let (tx2, rx2) = sync_channel::<()>(3); + + let _t = thread::spawn(move || { + rx1.recv().unwrap(); + tx2.try_send(()).unwrap(); + }); + + tx1.try_send(()).unwrap(); + rx2.recv().unwrap(); + } + + for _ in 0..100 { + repro() + } + } +} + +// Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs +mod select_tests { + use super::*; + + use std::thread; + + #[test] + fn smoke() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + tx1.send(1).unwrap(); + select! { + foo = rx1.recv() => assert_eq!(foo.unwrap(), 1), + _bar = rx2.recv() => panic!() + } + tx2.send(2).unwrap(); + select! { + _foo = rx1.recv() => panic!(), + bar = rx2.recv() => assert_eq!(bar.unwrap(), 2) + } + drop(tx1); + select! { + foo = rx1.recv() => assert!(foo.is_err()), + _bar = rx2.recv() => panic!() + } + drop(tx2); + select! { + bar = rx2.recv() => assert!(bar.is_err()) + } + } + + #[test] + fn smoke2() { + let (_tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (_tx3, rx3) = channel::<i32>(); + let (_tx4, rx4) = channel::<i32>(); + let (tx5, rx5) = channel::<i32>(); + tx5.send(4).unwrap(); + select! { + _foo = rx1.recv() => panic!("1"), + _foo = rx2.recv() => panic!("2"), + _foo = rx3.recv() => panic!("3"), + _foo = rx4.recv() => panic!("4"), + foo = rx5.recv() => assert_eq!(foo.unwrap(), 4) + } + } + + #[test] + fn closed() { + let (_tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + drop(tx2); + + select! { + _a1 = rx1.recv() => panic!(), + a2 = rx2.recv() => assert!(a2.is_err()) + } + } + + #[test] + fn unblocks() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<i32>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + rx3.recv().unwrap(); + for _ in 0..20 { + thread::yield_now(); + } + }); + + select! { + a = rx1.recv() => assert_eq!(a.unwrap(), 1), + _b = rx2.recv() => panic!() + } + tx3.send(1).unwrap(); + select! { + a = rx1.recv() => assert!(a.is_err()), + _b = rx2.recv() => panic!() + } + t.join().unwrap(); + } + + #[test] + fn both_ready() { + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for _ in 0..20 { + thread::yield_now(); + } + tx1.send(1).unwrap(); + tx2.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + select! { + a = rx1.recv() => { assert_eq!(a.unwrap(), 1); }, + a = rx2.recv() => { assert_eq!(a.unwrap(), 2); } + } + assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty)); + assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty)); + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn stress() { + #[cfg(miri)] + const AMT: i32 = 100; + #[cfg(not(miri))] + const AMT: i32 = 10000; + + let (tx1, rx1) = channel::<i32>(); + let (tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + for i in 0..AMT { + if i % 2 == 0 { + tx1.send(i).unwrap(); + } else { + tx2.send(i).unwrap(); + } + rx3.recv().unwrap(); + } + }); + + for i in 0..AMT { + select! { + i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); }, + i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); } + } + tx3.send(()).unwrap(); + } + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[allow(unused_must_use)] + #[test] + fn cloning2() { + let (tx1, rx1) = channel::<i32>(); + let (_tx2, rx2) = channel::<i32>(); + let (tx3, rx3) = channel::<()>(); + + let t = thread::spawn(move || { + rx3.recv().unwrap(); + tx1.clone(); + assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty)); + tx1.send(2).unwrap(); + rx3.recv().unwrap(); + }); + + tx3.send(()).unwrap(); + select! { + _i1 = rx1.recv() => {}, + _i2 = rx2.recv() => panic!() + } + tx3.send(()).unwrap(); + t.join().unwrap(); + } + + #[test] + fn cloning3() { + let (tx1, rx1) = channel::<()>(); + let (tx2, rx2) = channel::<()>(); + let (tx3, rx3) = channel::<()>(); + let t = thread::spawn(move || { + select! { + _ = rx1.recv() => panic!(), + _ = rx2.recv() => {} + } + tx3.send(()).unwrap(); + }); + + for _ in 0..1000 { + thread::yield_now(); + } + drop(tx1.clone()); + tx2.send(()).unwrap(); + rx3.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn preflight1() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight2() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight3() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _n = rx.recv() => {} + } + } + + #[test] + fn preflight4() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight5() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight6() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight7() { + let (tx, rx) = channel::<()>(); + drop(tx); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight8() { + let (tx, rx) = channel(); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn preflight9() { + let (tx, rx) = channel(); + drop(tx.clone()); + tx.send(()).unwrap(); + drop(tx); + rx.recv().unwrap(); + select! { + _ = rx.recv() => {} + } + } + + #[test] + fn oneshot_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn stream_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + tx1.send(()).unwrap(); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn shared_data_waiting() { + let (tx1, rx1) = channel(); + let (tx2, rx2) = channel(); + drop(tx1.clone()); + tx1.send(()).unwrap(); + rx1.recv().unwrap(); + let t = thread::spawn(move || { + select! { + _n = rx1.recv() => {} + } + tx2.send(()).unwrap(); + }); + + for _ in 0..100 { + thread::yield_now() + } + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + t.join().unwrap(); + } + + #[test] + fn sync1() { + let (tx, rx) = sync_channel::<i32>(1); + tx.send(1).unwrap(); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + } + + #[test] + fn sync2() { + let (tx, rx) = sync_channel::<i32>(0); + let t = thread::spawn(move || { + for _ in 0..100 { + thread::yield_now() + } + tx.send(1).unwrap(); + }); + select! { + n = rx.recv() => { assert_eq!(n.unwrap(), 1); } + } + t.join().unwrap(); + } + + #[test] + fn sync3() { + let (tx1, rx1) = sync_channel::<i32>(0); + let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel(); + let t = thread::spawn(move || { + tx1.send(1).unwrap(); + }); + let t2 = thread::spawn(move || { + tx2.send(2).unwrap(); + }); + select! { + n = rx1.recv() => { + let n = n.unwrap(); + assert_eq!(n, 1); + assert_eq!(rx2.recv().unwrap(), 2); + }, + n = rx2.recv() => { + let n = n.unwrap(); + assert_eq!(n, 2); + assert_eq!(rx1.recv().unwrap(), 1); + } + } + t.join().unwrap(); + t2.join().unwrap(); + } +} diff --git a/third_party/rust/crossbeam-channel/tests/never.rs b/third_party/rust/crossbeam-channel/tests/never.rs new file mode 100644 index 0000000000..f275126f7d --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/never.rs @@ -0,0 +1,95 @@ +//! Tests for the never channel flavor. + +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{never, select, tick, unbounded}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + select! { + recv(never::<i32>()) -> _ => panic!(), + default => {} + } +} + +#[test] +fn optional() { + let (s, r) = unbounded::<i32>(); + s.send(1).unwrap(); + s.send(2).unwrap(); + + let mut r = Some(&r); + select! { + recv(r.unwrap_or(&never())) -> _ => {} + default => panic!(), + } + + r = None; + select! { + recv(r.unwrap_or(&never())) -> _ => panic!(), + default => {} + } +} + +#[test] +fn tick_n() { + let mut r = tick(ms(100)); + let mut step = 0; + + loop { + select! { + recv(r) -> _ => step += 1, + default(ms(500)) => break, + } + + if step == 10 { + r = never(); + } + } + + assert_eq!(step, 10); +} + +#[test] +fn capacity() { + let r = never::<i32>(); + assert_eq!(r.capacity(), Some(0)); +} + +#[test] +fn len_empty_full() { + let r = never::<i32>(); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(r.is_full()); +} + +#[test] +fn try_recv() { + let r = never::<i32>(); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(100)); + assert!(r.try_recv().is_err()); +} + +#[test] +fn recv_timeout() { + let start = Instant::now(); + let r = never::<i32>(); + + assert!(r.recv_timeout(ms(100)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(100)); + assert!(now - start <= ms(150)); + + assert!(r.recv_timeout(ms(100)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(200)); + assert!(now - start <= ms(250)); +} diff --git a/third_party/rust/crossbeam-channel/tests/ready.rs b/third_party/rust/crossbeam-channel/tests/ready.rs new file mode 100644 index 0000000000..d8dd6ceb50 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/ready.rs @@ -0,0 +1,852 @@ +//! Tests for channel readiness using the `Select` struct. + +#![allow(clippy::drop_copy)] + +use std::any::Any; +use std::cell::Cell; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, bounded, tick, unbounded}; +use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke1() { + let (s1, r1) = unbounded::<usize>(); + let (s2, r2) = unbounded::<usize>(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + assert_eq!(sel.ready(), 0); + assert_eq!(r1.try_recv(), Ok(1)); + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + assert_eq!(sel.ready(), 1); + assert_eq!(r2.try_recv(), Ok(2)); +} + +#[test] +fn smoke2() { + let (_s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (_s3, r3) = unbounded::<i32>(); + let (_s4, r4) = unbounded::<i32>(); + let (s5, r5) = unbounded::<i32>(); + + s5.send(5).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + sel.recv(&r3); + sel.recv(&r4); + sel.recv(&r5); + assert_eq!(sel.ready(), 4); + assert_eq!(r5.try_recv(), Ok(5)); +} + +#[test] +fn disconnected() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + drop(s1); + thread::sleep(ms(500)); + s2.send(5).unwrap(); + }); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready_timeout(ms(1000)) { + Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + + r2.recv().unwrap(); + }) + .unwrap(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready_timeout(ms(1000)) { + Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + drop(s2); + }); + + let mut sel = Select::new(); + sel.recv(&r2); + match sel.ready_timeout(ms(1000)) { + Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + }) + .unwrap(); +} + +#[test] +fn default() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + assert!(sel.try_ready().is_err()); + + drop(s1); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.try_ready() { + Ok(0) => assert!(r1.try_recv().is_err()), + _ => panic!(), + } + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r2); + match sel.try_ready() { + Ok(0) => assert_eq!(r2.try_recv(), Ok(2)), + _ => panic!(), + } + + let mut sel = Select::new(); + sel.recv(&r2); + assert!(sel.try_ready().is_err()); + + let mut sel = Select::new(); + assert!(sel.try_ready().is_err()); +} + +#[test] +fn timeout() { + let (_s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(1500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + assert!(sel.ready_timeout(ms(1000)).is_err()); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready_timeout(ms(1000)) { + Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), + _ => panic!(), + } + }) + .unwrap(); + + scope(|scope| { + let (s, r) = unbounded::<i32>(); + + scope.spawn(move |_| { + thread::sleep(ms(500)); + drop(s); + }); + + let mut sel = Select::new(); + assert!(sel.ready_timeout(ms(1000)).is_err()); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.try_ready() { + Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + }) + .unwrap(); +} + +#[test] +fn default_when_disconnected() { + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.try_ready() { + Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready_timeout(ms(1000)) { + Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + sel.send(&s); + match sel.try_ready() { + Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), + _ => panic!(), + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + sel.send(&s); + match sel.ready_timeout(ms(1000)) { + Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))), + _ => panic!(), + } +} + +#[test] +fn default_only() { + let start = Instant::now(); + + let mut sel = Select::new(); + assert!(sel.try_ready().is_err()); + let now = Instant::now(); + assert!(now - start <= ms(50)); + + let start = Instant::now(); + let mut sel = Select::new(); + assert!(sel.ready_timeout(ms(500)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(450)); + assert!(now - start <= ms(550)); +} + +#[test] +fn unblocks() { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready_timeout(ms(1000)) { + Ok(1) => assert_eq!(r2.try_recv(), Ok(2)), + _ => panic!(), + } + }) + .unwrap(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + assert_eq!(r1.recv().unwrap(), 1); + }); + + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper2 = sel.send(&s2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => oper.send(&s1, 1).unwrap(), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + }) + .unwrap(); +} + +#[test] +fn both_ready() { + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s1.send(1).unwrap(); + assert_eq!(r2.recv().unwrap(), 2); + }); + + for _ in 0..2 { + let mut sel = Select::new(); + sel.recv(&r1); + sel.send(&s2); + match sel.ready() { + 0 => assert_eq!(r1.try_recv(), Ok(1)), + 1 => s2.try_send(2).unwrap(), + _ => panic!(), + } + } + }) + .unwrap(); +} + +#[test] +fn cloning1() { + scope(|scope| { + let (s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (s3, r3) = unbounded::<()>(); + + scope.spawn(move |_| { + r3.recv().unwrap(); + drop(s1.clone()); + assert!(r3.try_recv().is_err()); + s1.send(1).unwrap(); + r3.recv().unwrap(); + }); + + s3.send(()).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready() { + 0 => drop(r1.try_recv()), + 1 => drop(r2.try_recv()), + _ => panic!(), + } + + s3.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn cloning2() { + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (_s3, _r3) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready() { + 0 => panic!(), + 1 => drop(r2.try_recv()), + _ => panic!(), + } + }); + + thread::sleep(ms(500)); + drop(s1.clone()); + s2.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn preflight1() { + let (s, r) = unbounded(); + s.send(()).unwrap(); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => drop(r.try_recv()), + _ => panic!(), + } +} + +#[test] +fn preflight2() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => assert_eq!(r.try_recv(), Ok(())), + _ => panic!(), + } + + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn preflight3() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + r.recv().unwrap(); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)), + _ => panic!(), + } +} + +#[test] +fn duplicate_operations() { + let (s, r) = unbounded::<i32>(); + let hit = vec![Cell::new(false); 4]; + + while hit.iter().map(|h| h.get()).any(|hit| !hit) { + let mut sel = Select::new(); + sel.recv(&r); + sel.recv(&r); + sel.send(&s); + sel.send(&s); + match sel.ready() { + 0 => { + assert!(r.try_recv().is_ok()); + hit[0].set(true); + } + 1 => { + assert!(r.try_recv().is_ok()); + hit[1].set(true); + } + 2 => { + assert!(s.try_send(0).is_ok()); + hit[2].set(true); + } + 3 => { + assert!(s.try_send(0).is_ok()); + hit[3].set(true); + } + _ => panic!(), + } + } +} + +#[test] +fn nesting() { + let (s, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + sel.send(&s); + match sel.ready() { + 0 => { + assert!(s.try_send(0).is_ok()); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => { + assert_eq!(r.try_recv(), Ok(0)); + + let mut sel = Select::new(); + sel.send(&s); + match sel.ready() { + 0 => { + assert!(s.try_send(1).is_ok()); + + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => { + assert_eq!(r.try_recv(), Ok(1)); + } + _ => panic!(), + } + } + _ => panic!(), + } + } + _ => panic!(), + } + } + _ => panic!(), + } +} + +#[test] +fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded(); + let (s2, r2) = bounded(5); + let (s3, r3) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + r3.recv().unwrap(); + + s2.send(i).unwrap(); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + match sel.ready() { + 0 => assert_eq!(r1.try_recv(), Ok(i)), + 1 => assert_eq!(r2.try_recv(), Ok(i)), + _ => panic!(), + } + + s3.send(()).unwrap(); + } + } + }) + .unwrap(); +} + +#[test] +fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r1.recv().unwrap(), i); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + sel.send(&s1); + sel.send(&s2); + match sel.ready() { + 0 => assert!(s1.try_send(i).is_ok()), + 1 => assert!(s2.try_send(i).is_ok()), + _ => panic!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + sel.recv(&r1); + sel.send(&s2); + match sel.ready() { + 0 => assert_eq!(r1.try_recv(), Ok(i)), + 1 => assert!(s2.try_send(i).is_ok()), + _ => panic!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 20; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + sel.send(&s); + match sel.ready_timeout(ms(100)) { + Err(_) => {} + Ok(0) => { + assert!(s.try_send(i).is_ok()); + break; + } + Ok(_) => panic!(), + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready_timeout(ms(100)) { + Err(_) => {} + Ok(0) => { + assert_eq!(r.try_recv(), Ok(i)); + break; + } + Ok(_) => panic!(), + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn send_recv_same_channel() { + let (s, r) = bounded::<i32>(0); + let mut sel = Select::new(); + sel.send(&s); + sel.recv(&r); + assert!(sel.ready_timeout(ms(100)).is_err()); + + let (s, r) = unbounded::<i32>(); + let mut sel = Select::new(); + sel.send(&s); + sel.recv(&r); + match sel.ready_timeout(ms(100)) { + Err(_) => panic!(), + Ok(0) => assert!(s.try_send(0).is_ok()), + Ok(_) => panic!(), + } +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + for cap in 1..4 { + let (s, r) = bounded::<T>(cap); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(cap); + let new_r: T = Box::new(Some(new_r)); + + { + let mut sel = Select::new(); + sel.send(&s); + match sel.ready() { + 0 => assert!(s.try_send(new_r).is_ok()), + _ => panic!(), + } + } + + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + let new = { + let mut sel = Select::new(); + sel.recv(&r); + match sel.ready() { + 0 => r + .try_recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap(), + _ => panic!(), + } + }; + r = new; + } + }); + }) + .unwrap(); + } +} + +#[test] +fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let hits = vec![Cell::new(0usize); 4]; + for _ in 0..COUNT { + let after = after(ms(0)); + let tick = tick(ms(0)); + + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + sel.recv(&after); + sel.recv(&tick); + match sel.ready() { + 0 => { + r1.try_recv().unwrap(); + hits[0].set(hits[0].get() + 1); + } + 1 => { + r2.try_recv().unwrap(); + hits[1].set(hits[1].get() + 1); + } + 2 => { + after.try_recv().unwrap(); + hits[2].set(hits[2].get() + 1); + } + 3 => { + tick.try_recv().unwrap(); + hits[3].set(hits[3].get() + 1); + } + _ => panic!(), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = bounded::<()>(1); + let (s3, r3) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + let mut sel = Select::new(); + let mut oper1 = None; + let mut oper2 = None; + if s1.is_empty() { + oper1 = Some(sel.send(&s1)); + } + if s2.is_empty() { + oper2 = Some(sel.send(&s2)); + } + let oper3 = sel.send(&s3); + let oper = sel.select(); + match oper.index() { + i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), + i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), + i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), + _ => unreachable!(), + } + } + }); + + let hits = vec![Cell::new(0usize); 3]; + for _ in 0..COUNT { + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + sel.recv(&r3); + loop { + match sel.ready() { + 0 => { + if r1.try_recv().is_ok() { + hits[0].set(hits[0].get() + 1); + break; + } + } + 1 => { + if r2.try_recv().is_ok() { + hits[1].set(hits[1].get() + 1); + break; + } + } + 2 => { + if r3.try_recv().is_ok() { + hits[2].set(hits[2].get() + 1); + break; + } + } + _ => unreachable!(), + } + } + } + assert!(hits.iter().all(|x| x.get() > 0)); + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-channel/tests/same_channel.rs b/third_party/rust/crossbeam-channel/tests/same_channel.rs new file mode 100644 index 0000000000..da4c8f3e70 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/same_channel.rs @@ -0,0 +1,112 @@ +use std::time::Duration; + +use crossbeam_channel::{after, bounded, never, tick, unbounded}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn after_same_channel() { + let r = after(ms(50)); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + let r3 = after(ms(50)); + assert!(!r.same_channel(&r3)); + assert!(!r2.same_channel(&r3)); + + let r4 = after(ms(100)); + assert!(!r.same_channel(&r4)); + assert!(!r2.same_channel(&r4)); +} + +#[test] +fn array_same_channel() { + let (s, r) = bounded::<usize>(1); + + let s2 = s.clone(); + assert!(s.same_channel(&s2)); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + let (s3, r3) = bounded::<usize>(1); + assert!(!s.same_channel(&s3)); + assert!(!s2.same_channel(&s3)); + assert!(!r.same_channel(&r3)); + assert!(!r2.same_channel(&r3)); +} + +#[test] +fn list_same_channel() { + let (s, r) = unbounded::<usize>(); + + let s2 = s.clone(); + assert!(s.same_channel(&s2)); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + let (s3, r3) = unbounded::<usize>(); + assert!(!s.same_channel(&s3)); + assert!(!s2.same_channel(&s3)); + assert!(!r.same_channel(&r3)); + assert!(!r2.same_channel(&r3)); +} + +#[test] +fn never_same_channel() { + let r = never::<usize>(); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + // Never channel are always equal to one another. + let r3 = never::<usize>(); + assert!(r.same_channel(&r3)); + assert!(r2.same_channel(&r3)); +} + +#[test] +fn tick_same_channel() { + let r = tick(ms(50)); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + let r3 = tick(ms(50)); + assert!(!r.same_channel(&r3)); + assert!(!r2.same_channel(&r3)); + + let r4 = tick(ms(100)); + assert!(!r.same_channel(&r4)); + assert!(!r2.same_channel(&r4)); +} + +#[test] +fn zero_same_channel() { + let (s, r) = bounded::<usize>(0); + + let s2 = s.clone(); + assert!(s.same_channel(&s2)); + + let r2 = r.clone(); + assert!(r.same_channel(&r2)); + + let (s3, r3) = bounded::<usize>(0); + assert!(!s.same_channel(&s3)); + assert!(!s2.same_channel(&s3)); + assert!(!r.same_channel(&r3)); + assert!(!r2.same_channel(&r3)); +} + +#[test] +fn different_flavors_same_channel() { + let (s1, r1) = bounded::<usize>(0); + let (s2, r2) = unbounded::<usize>(); + + assert!(!s1.same_channel(&s2)); + assert!(!r1.same_channel(&r2)); +} diff --git a/third_party/rust/crossbeam-channel/tests/select.rs b/third_party/rust/crossbeam-channel/tests/select.rs new file mode 100644 index 0000000000..bc5824daba --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/select.rs @@ -0,0 +1,1328 @@ +//! Tests for channel selection using the `Select` struct. + +#![allow(clippy::drop_copy)] + +use std::any::Any; +use std::cell::Cell; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke1() { + let (s1, r1) = unbounded::<usize>(); + let (s2, r2) = unbounded::<usize>(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => panic!(), + _ => unreachable!(), + } + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + } +} + +#[test] +fn smoke2() { + let (_s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (_s3, r3) = unbounded::<i32>(); + let (_s4, r4) = unbounded::<i32>(); + let (s5, r5) = unbounded::<i32>(); + + s5.send(5).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&r3); + let oper4 = sel.recv(&r4); + let oper5 = sel.recv(&r5); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => panic!(), + i if i == oper3 => panic!(), + i if i == oper4 => panic!(), + i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)), + _ => unreachable!(), + } +} + +#[test] +fn disconnected() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + drop(s1); + thread::sleep(ms(500)); + s2.send(5).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + r2.recv().unwrap(); + }) + .unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + drop(s2); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r2).is_err()), + _ => unreachable!(), + }, + } + }) + .unwrap(); +} + +#[test] +fn default() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + let mut sel = Select::new(); + let _oper1 = sel.recv(&r1); + let _oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } + + drop(s1); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r1).is_err()), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + s2.send(2).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + + let mut sel = Select::new(); + let _oper1 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } + + let mut sel = Select::new(); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(_) => panic!(), + } +} + +#[test] +fn timeout() { + let (_s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(1500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + }) + .unwrap(); + + scope(|scope| { + let (s, r) = unbounded::<i32>(); + + scope.spawn(move |_| { + thread::sleep(ms(500)); + drop(s); + }); + + let mut sel = Select::new(); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + } + Ok(_) => unreachable!(), + } + }) + .unwrap(); +} + +#[test] +fn default_when_disconnected() { + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + + let (_, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + }, + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.try_select(); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.send(&s, 0).is_err()), + _ => unreachable!(), + }, + } + + let (s, _) = bounded::<i32>(0); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => assert!(oper.send(&s, 0).is_err()), + _ => unreachable!(), + }, + } +} + +#[test] +fn default_only() { + let start = Instant::now(); + + let mut sel = Select::new(); + let oper = sel.try_select(); + assert!(oper.is_err()); + let now = Instant::now(); + assert!(now - start <= ms(50)); + + let start = Instant::now(); + let mut sel = Select::new(); + let oper = sel.select_timeout(ms(500)); + assert!(oper.is_err()); + let now = Instant::now(); + assert!(now - start >= ms(450)); + assert!(now - start <= ms(550)); +} + +#[test] +fn unblocks() { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s2.send(2).unwrap(); + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)), + _ => unreachable!(), + }, + } + }) + .unwrap(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + assert_eq!(r1.recv().unwrap(), 1); + }); + + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper2 = sel.send(&s2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + i if i == oper1 => oper.send(&s1, 1).unwrap(), + i if i == oper2 => panic!(), + _ => unreachable!(), + }, + } + }) + .unwrap(); +} + +#[test] +fn both_ready() { + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s1.send(1).unwrap(); + assert_eq!(r2.recv().unwrap(), 2); + }); + + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => oper.send(&s2, 2).unwrap(), + _ => unreachable!(), + } + } + }) + .unwrap(); +} + +#[test] +fn loop_try() { + const RUNS: usize = 20; + + for _ in 0..RUNS { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + let (s_end, r_end) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| loop { + let mut done = false; + + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.send(&s1, 1); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + + let mut sel = Select::new(); + let oper1 = sel.recv(&r_end); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.recv(&r_end); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + }); + + scope.spawn(|_| loop { + if let Ok(x) = r2.try_recv() { + assert_eq!(x, 2); + break; + } + + let mut done = false; + let mut sel = Select::new(); + let oper1 = sel.recv(&r_end); + let oper = sel.try_select(); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => { + let _ = oper.recv(&r_end); + done = true; + } + _ => unreachable!(), + }, + } + if done { + break; + } + }); + + scope.spawn(|_| { + thread::sleep(ms(500)); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select_timeout(ms(1000)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)), + i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()), + _ => unreachable!(), + }, + } + + drop(s_end); + }); + }) + .unwrap(); + } +} + +#[test] +fn cloning1() { + scope(|scope| { + let (s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (s3, r3) = unbounded::<()>(); + + scope.spawn(move |_| { + r3.recv().unwrap(); + drop(s1.clone()); + assert!(r3.try_recv().is_err()); + s1.send(1).unwrap(); + r3.recv().unwrap(); + }); + + s3.send(()).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => drop(oper.recv(&r1)), + i if i == oper2 => drop(oper.recv(&r2)), + _ => unreachable!(), + } + + s3.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn cloning2() { + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (_s3, _r3) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => panic!(), + i if i == oper2 => drop(oper.recv(&r2)), + _ => unreachable!(), + } + }); + + thread::sleep(ms(500)); + drop(s1.clone()); + s2.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn preflight1() { + let (s, r) = unbounded(); + s.send(()).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => drop(oper.recv(&r)), + _ => unreachable!(), + } +} + +#[test] +fn preflight2() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())), + _ => unreachable!(), + } + + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn preflight3() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + r.recv().unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => assert!(oper.recv(&r).is_err()), + _ => unreachable!(), + } +} + +#[test] +fn duplicate_operations() { + let (s, r) = unbounded::<i32>(); + let hit = vec![Cell::new(false); 4]; + + while hit.iter().map(|h| h.get()).any(|hit| !hit) { + let mut sel = Select::new(); + let oper0 = sel.recv(&r); + let oper1 = sel.recv(&r); + let oper2 = sel.send(&s); + let oper3 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper0 => { + assert!(oper.recv(&r).is_ok()); + hit[0].set(true); + } + i if i == oper1 => { + assert!(oper.recv(&r).is_ok()); + hit[1].set(true); + } + i if i == oper2 => { + assert!(oper.send(&s, 0).is_ok()); + hit[2].set(true); + } + i if i == oper3 => { + assert!(oper.send(&s, 0).is_ok()); + hit[3].set(true); + } + _ => unreachable!(), + } + } +} + +#[test] +fn nesting() { + let (s, r) = unbounded::<i32>(); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert!(oper.send(&s, 0).is_ok()); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert_eq!(oper.recv(&r), Ok(0)); + + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert!(oper.send(&s, 1).is_ok()); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + assert_eq!(oper.recv(&r), Ok(1)); + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } + } + _ => unreachable!(), + } +} + +#[test] +fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded(); + let (s2, r2) = bounded(5); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + r3.recv().unwrap(); + + s2.send(i).unwrap(); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)), + _ => unreachable!(), + } + + s3.send(()).unwrap(); + } + } + }) + .unwrap(); +} + +#[test] +fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r1.recv().unwrap(), i); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.send(&s1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 20; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => { + assert!(oper.send(&s, i).is_ok()); + break; + } + _ => unreachable!(), + }, + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => { + assert_eq!(oper.recv(&r), Ok(i)); + break; + } + _ => unreachable!(), + }, + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn send_recv_same_channel() { + let (s, r) = bounded::<i32>(0); + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper2 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => {} + Ok(oper) => match oper.index() { + ix if ix == oper1 => panic!(), + ix if ix == oper2 => panic!(), + _ => unreachable!(), + }, + } + + let (s, r) = unbounded::<i32>(); + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper2 = sel.recv(&r); + let oper = sel.select_timeout(ms(100)); + match oper { + Err(_) => panic!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()), + ix if ix == oper2 => panic!(), + _ => unreachable!(), + }, + } +} + +#[test] +fn matching() { + const THREADS: usize = 44; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn matching_with_leftover() { + const THREADS: usize = 55; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + s.send(!0).unwrap(); + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + for cap in 0..3 { + let (s, r) = bounded::<T>(cap); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(cap); + let new_r: T = Box::new(Some(new_r)); + + { + let mut sel = Select::new(); + let oper1 = sel.send(&s); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()), + _ => unreachable!(), + } + } + + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + let new = { + let mut sel = Select::new(); + let oper1 = sel.recv(&r); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => oper + .recv(&r) + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap(), + _ => unreachable!(), + } + }; + r = new; + } + }); + }) + .unwrap(); + } +} + +#[test] +fn linearizable_try() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.try_select(); + match oper { + Err(_) => unreachable!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), + ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), + _ => unreachable!(), + }, + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn linearizable_timeout() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper = sel.select_timeout(ms(0)); + match oper { + Err(_) => unreachable!(), + Ok(oper) => match oper.index() { + ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()), + ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()), + _ => unreachable!(), + }, + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let hits = vec![Cell::new(0usize); 4]; + for _ in 0..COUNT { + let after = after(ms(0)); + let tick = tick(ms(0)); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&after); + let oper4 = sel.recv(&tick); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + oper.recv(&r1).unwrap(); + hits[0].set(hits[0].get() + 1); + } + i if i == oper2 => { + oper.recv(&r2).unwrap(); + hits[1].set(hits[1].get() + 1); + } + i if i == oper3 => { + oper.recv(&after).unwrap(); + hits[2].set(hits[2].get() + 1); + } + i if i == oper4 => { + oper.recv(&tick).unwrap(); + hits[3].set(hits[3].get() + 1); + } + _ => unreachable!(), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = bounded::<()>(1); + let (s3, r3) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + let mut sel = Select::new(); + let mut oper1 = None; + let mut oper2 = None; + if s1.is_empty() { + oper1 = Some(sel.send(&s1)); + } + if s2.is_empty() { + oper2 = Some(sel.send(&s2)); + } + let oper3 = sel.send(&s3); + let oper = sel.select(); + match oper.index() { + i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()), + i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()), + i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()), + _ => unreachable!(), + } + } + }); + + let hits = vec![Cell::new(0usize); 3]; + for _ in 0..COUNT { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&r3); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + oper.recv(&r1).unwrap(); + hits[0].set(hits[0].get() + 1); + } + i if i == oper2 => { + oper.recv(&r2).unwrap(); + hits[1].set(hits[1].get() + 1); + } + i if i == oper3 => { + oper.recv(&r3).unwrap(); + hits[2].set(hits[2].get() + 1); + } + _ => unreachable!(), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); + }) + .unwrap(); +} + +#[test] +fn sync_and_clone() { + const THREADS: usize = 20; + + let (s, r) = &bounded::<usize>(0); + + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + let sel = &sel; + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + let mut sel = sel.clone(); + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn send_and_clone() { + const THREADS: usize = 20; + + let (s, r) = &bounded::<usize>(0); + + let mut sel = Select::new(); + let oper1 = sel.recv(r); + let oper2 = sel.send(s); + + scope(|scope| { + for i in 0..THREADS { + let mut sel = sel.clone(); + scope.spawn(move |_| { + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_ne!(oper.recv(r), Ok(i)), + ix if ix == oper2 => assert!(oper.send(s, i).is_ok()), + _ => unreachable!(), + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn reuse() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.send(&s2); + + for i in 0..COUNT { + for _ in 0..2 { + let oper = sel.select(); + match oper.index() { + ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)), + ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()), + _ => unreachable!(), + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-channel/tests/select_macro.rs b/third_party/rust/crossbeam-channel/tests/select_macro.rs new file mode 100644 index 0000000000..119454cd68 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/select_macro.rs @@ -0,0 +1,1480 @@ +//! Tests for the `select!` macro. + +#![forbid(unsafe_code)] // select! is safe. +#![allow(clippy::drop_copy, clippy::match_single_binding)] + +use std::any::Any; +use std::cell::Cell; +use std::ops::Deref; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, bounded, never, select, tick, unbounded}; +use crossbeam_channel::{Receiver, RecvError, SendError, Sender, TryRecvError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke1() { + let (s1, r1) = unbounded::<usize>(); + let (s2, r2) = unbounded::<usize>(); + + s1.send(1).unwrap(); + + select! { + recv(r1) -> v => assert_eq!(v, Ok(1)), + recv(r2) -> _ => panic!(), + } + + s2.send(2).unwrap(); + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> v => assert_eq!(v, Ok(2)), + } +} + +#[test] +fn smoke2() { + let (_s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (_s3, r3) = unbounded::<i32>(); + let (_s4, r4) = unbounded::<i32>(); + let (s5, r5) = unbounded::<i32>(); + + s5.send(5).unwrap(); + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> _ => panic!(), + recv(r3) -> _ => panic!(), + recv(r4) -> _ => panic!(), + recv(r5) -> v => assert_eq!(v, Ok(5)), + } +} + +#[test] +fn disconnected() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + drop(s1); + thread::sleep(ms(500)); + s2.send(5).unwrap(); + }); + + select! { + recv(r1) -> v => assert!(v.is_err()), + recv(r2) -> _ => panic!(), + default(ms(1000)) => panic!(), + } + + r2.recv().unwrap(); + }) + .unwrap(); + + select! { + recv(r1) -> v => assert!(v.is_err()), + recv(r2) -> _ => panic!(), + default(ms(1000)) => panic!(), + } + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + drop(s2); + }); + + select! { + recv(r2) -> v => assert!(v.is_err()), + default(ms(1000)) => panic!(), + } + }) + .unwrap(); +} + +#[test] +fn default() { + let (s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> _ => panic!(), + default => {} + } + + drop(s1); + + select! { + recv(r1) -> v => assert!(v.is_err()), + recv(r2) -> _ => panic!(), + default => panic!(), + } + + s2.send(2).unwrap(); + + select! { + recv(r2) -> v => assert_eq!(v, Ok(2)), + default => panic!(), + } + + select! { + recv(r2) -> _ => panic!(), + default => {}, + } + + select! { + default => {}, + } +} + +#[test] +fn timeout() { + let (_s1, r1) = unbounded::<i32>(); + let (s2, r2) = unbounded::<i32>(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(1500)); + s2.send(2).unwrap(); + }); + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> _ => panic!(), + default(ms(1000)) => {}, + } + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> v => assert_eq!(v, Ok(2)), + default(ms(1000)) => panic!(), + } + }) + .unwrap(); + + scope(|scope| { + let (s, r) = unbounded::<i32>(); + + scope.spawn(move |_| { + thread::sleep(ms(500)); + drop(s); + }); + + select! { + default(ms(1000)) => { + select! { + recv(r) -> v => assert!(v.is_err()), + default => panic!(), + } + } + } + }) + .unwrap(); +} + +#[test] +fn default_when_disconnected() { + let (_, r) = unbounded::<i32>(); + + select! { + recv(r) -> res => assert!(res.is_err()), + default => panic!(), + } + + let (_, r) = unbounded::<i32>(); + + select! { + recv(r) -> res => assert!(res.is_err()), + default(ms(1000)) => panic!(), + } + + let (s, _) = bounded::<i32>(0); + + select! { + send(s, 0) -> res => assert!(res.is_err()), + default => panic!(), + } + + let (s, _) = bounded::<i32>(0); + + select! { + send(s, 0) -> res => assert!(res.is_err()), + default(ms(1000)) => panic!(), + } +} + +#[test] +fn default_only() { + let start = Instant::now(); + select! { + default => {} + } + let now = Instant::now(); + assert!(now - start <= ms(50)); + + let start = Instant::now(); + select! { + default(ms(500)) => {} + } + let now = Instant::now(); + assert!(now - start >= ms(450)); + assert!(now - start <= ms(550)); +} + +#[test] +fn unblocks() { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s2.send(2).unwrap(); + }); + + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> v => assert_eq!(v, Ok(2)), + default(ms(1000)) => panic!(), + } + }) + .unwrap(); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + assert_eq!(r1.recv().unwrap(), 1); + }); + + select! { + send(s1, 1) -> _ => {}, + send(s2, 2) -> _ => panic!(), + default(ms(1000)) => panic!(), + } + }) + .unwrap(); +} + +#[test] +fn both_ready() { + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(500)); + s1.send(1).unwrap(); + assert_eq!(r2.recv().unwrap(), 2); + }); + + for _ in 0..2 { + select! { + recv(r1) -> v => assert_eq!(v, Ok(1)), + send(s2, 2) -> _ => {}, + } + } + }) + .unwrap(); +} + +#[test] +fn loop_try() { + const RUNS: usize = 20; + + for _ in 0..RUNS { + let (s1, r1) = bounded::<i32>(0); + let (s2, r2) = bounded::<i32>(0); + let (s_end, r_end) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| loop { + select! { + send(s1, 1) -> _ => break, + default => {} + } + + select! { + recv(r_end) -> _ => break, + default => {} + } + }); + + scope.spawn(|_| loop { + if let Ok(x) = r2.try_recv() { + assert_eq!(x, 2); + break; + } + + select! { + recv(r_end) -> _ => break, + default => {} + } + }); + + scope.spawn(|_| { + thread::sleep(ms(500)); + + select! { + recv(r1) -> v => assert_eq!(v, Ok(1)), + send(s2, 2) -> _ => {}, + default(ms(500)) => panic!(), + } + + drop(s_end); + }); + }) + .unwrap(); + } +} + +#[test] +fn cloning1() { + scope(|scope| { + let (s1, r1) = unbounded::<i32>(); + let (_s2, r2) = unbounded::<i32>(); + let (s3, r3) = unbounded::<()>(); + + scope.spawn(move |_| { + r3.recv().unwrap(); + drop(s1.clone()); + assert_eq!(r3.try_recv(), Err(TryRecvError::Empty)); + s1.send(1).unwrap(); + r3.recv().unwrap(); + }); + + s3.send(()).unwrap(); + + select! { + recv(r1) -> _ => {}, + recv(r2) -> _ => {}, + } + + s3.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn cloning2() { + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = unbounded::<()>(); + let (_s3, _r3) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(move |_| { + select! { + recv(r1) -> _ => panic!(), + recv(r2) -> _ => {}, + } + }); + + thread::sleep(ms(500)); + drop(s1.clone()); + s2.send(()).unwrap(); + }) + .unwrap(); +} + +#[test] +fn preflight1() { + let (s, r) = unbounded(); + s.send(()).unwrap(); + + select! { + recv(r) -> _ => {} + } +} + +#[test] +fn preflight2() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + + select! { + recv(r) -> v => assert!(v.is_ok()), + } + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[test] +fn preflight3() { + let (s, r) = unbounded(); + drop(s.clone()); + s.send(()).unwrap(); + drop(s); + r.recv().unwrap(); + + select! { + recv(r) -> v => assert!(v.is_err()) + } +} + +#[test] +fn duplicate_operations() { + let (s, r) = unbounded::<i32>(); + let mut hit = [false; 4]; + + while hit.iter().any(|hit| !hit) { + select! { + recv(r) -> _ => hit[0] = true, + recv(r) -> _ => hit[1] = true, + send(s, 0) -> _ => hit[2] = true, + send(s, 0) -> _ => hit[3] = true, + } + } +} + +#[test] +fn nesting() { + let (s, r) = unbounded::<i32>(); + + select! { + send(s, 0) -> _ => { + select! { + recv(r) -> v => { + assert_eq!(v, Ok(0)); + select! { + send(s, 1) -> _ => { + select! { + recv(r) -> v => { + assert_eq!(v, Ok(1)); + } + } + } + } + } + } + } + } +} + +#[test] +#[should_panic(expected = "send panicked")] +fn panic_sender() { + fn get() -> Sender<i32> { + panic!("send panicked") + } + + #[allow(unreachable_code)] + { + select! { + send(get(), panic!()) -> _ => {} + } + } +} + +#[test] +#[should_panic(expected = "recv panicked")] +fn panic_receiver() { + fn get() -> Receiver<i32> { + panic!("recv panicked") + } + + select! { + recv(get()) -> _ => {} + } +} + +#[test] +fn stress_recv() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded(); + let (s2, r2) = bounded(5); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + r3.recv().unwrap(); + + s2.send(i).unwrap(); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + select! { + recv(r1) -> v => assert_eq!(v, Ok(i)), + recv(r2) -> v => assert_eq!(v, Ok(i)), + } + + s3.send(()).unwrap(); + } + } + }) + .unwrap(); +} + +#[test] +fn stress_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r1.recv().unwrap(), i); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + select! { + send(s1, i) -> _ => {}, + send(s2, i) -> _ => {}, + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_mixed() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded(0); + let (s2, r2) = bounded(0); + let (s3, r3) = bounded(100); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + s1.send(i).unwrap(); + assert_eq!(r2.recv().unwrap(), i); + r3.recv().unwrap(); + } + }); + + for i in 0..COUNT { + for _ in 0..2 { + select! { + recv(r1) -> v => assert_eq!(v, Ok(i)), + send(s2, i) -> _ => {}, + } + } + s3.send(()).unwrap(); + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 20; + + let (s, r) = bounded(2); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + select! { + send(s, i) -> _ => break, + default(ms(100)) => {} + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(500)); + } + + loop { + select! { + recv(r) -> v => { + assert_eq!(v, Ok(i)); + break; + } + default(ms(100)) => {} + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn send_recv_same_channel() { + let (s, r) = bounded::<i32>(0); + select! { + send(s, 0) -> _ => panic!(), + recv(r) -> _ => panic!(), + default(ms(500)) => {} + } + + let (s, r) = unbounded::<i32>(); + select! { + send(s, 0) -> _ => {}, + recv(r) -> _ => panic!(), + default(ms(500)) => panic!(), + } +} + +#[test] +fn matching() { + const THREADS: usize = 44; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + select! { + recv(r) -> v => assert_ne!(v.unwrap(), i), + send(s, i) -> _ => {}, + } + }); + } + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn matching_with_leftover() { + const THREADS: usize = 55; + + let (s, r) = &bounded::<usize>(0); + + scope(|scope| { + for i in 0..THREADS { + scope.spawn(move |_| { + select! { + recv(r) -> v => assert_ne!(v.unwrap(), i), + send(s, i) -> _ => {}, + } + }); + } + s.send(!0).unwrap(); + }) + .unwrap(); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + for cap in 0..3 { + let (s, r) = bounded::<T>(cap); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(cap); + let new_r: T = Box::new(Some(new_r)); + + select! { + send(s, new_r) -> _ => {} + } + + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = select! { + recv(r) -> msg => { + msg.unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + } + } + }); + }) + .unwrap(); + } +} + +#[test] +fn linearizable_default() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + default => unreachable!() + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn linearizable_timeout() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + for step in 0..2 { + let (start_s, start_r) = bounded::<()>(0); + let (end_s, end_r) = bounded::<()>(0); + + let ((s1, r1), (s2, r2)) = if step == 0 { + (bounded::<i32>(1), bounded::<i32>(1)) + } else { + (unbounded::<i32>(), unbounded::<i32>()) + }; + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..COUNT { + start_s.send(()).unwrap(); + + s1.send(1).unwrap(); + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + default(ms(0)) => unreachable!() + } + + end_s.send(()).unwrap(); + let _ = r2.try_recv(); + } + }); + + for _ in 0..COUNT { + start_r.recv().unwrap(); + + s2.send(1).unwrap(); + let _ = r1.try_recv(); + + end_r.recv().unwrap(); + } + }) + .unwrap(); + } +} + +#[test] +fn fairness1() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let mut hits = [0usize; 4]; + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + recv(after(ms(0))) -> _ => hits[2] += 1, + recv(tick(ms(0))) -> _ => hits[3] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); +} + +#[test] +fn fairness2() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = unbounded::<()>(); + let (s2, r2) = bounded::<()>(1); + let (s3, r3) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + let (hole, _r) = bounded(0); + + for _ in 0..COUNT { + let s1 = if s1.is_empty() { &s1 } else { &hole }; + let s2 = if s2.is_empty() { &s2 } else { &hole }; + + select! { + send(s1, ()) -> res => assert!(res.is_ok()), + send(s2, ()) -> res => assert!(res.is_ok()), + send(s3, ()) -> res => assert!(res.is_ok()), + } + } + }); + + let hits = vec![Cell::new(0usize); 3]; + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0].set(hits[0].get() + 1), + recv(r2) -> _ => hits[1].set(hits[1].get() + 1), + recv(r3) -> _ => hits[2].set(hits[2].get() + 1), + } + } + assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50)); + }) + .unwrap(); +} + +#[test] +fn fairness_recv() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(COUNT); + let (s2, r2) = unbounded::<()>(); + + for _ in 0..COUNT { + s1.send(()).unwrap(); + s2.send(()).unwrap(); + } + + let mut hits = [0usize; 2]; + while hits[0] + hits[1] < COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / 4)); +} + +#[test] +fn fairness_send() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, _r1) = bounded::<()>(COUNT); + let (s2, _r2) = unbounded::<()>(); + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select! { + send(s1, ()) -> _ => hits[0] += 1, + send(s2, ()) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / 4)); +} + +#[allow(clippy::or_fun_call)] // This is intentional. +#[test] +fn references() { + let (s, r) = unbounded::<i32>(); + select! { + send(s, 0) -> _ => {} + recv(r) -> _ => {} + } + select! { + send(&&&&s, 0) -> _ => {} + recv(&&&&r) -> _ => {} + } + select! { + recv(Some(&r).unwrap_or(&never())) -> _ => {}, + default => {} + } + select! { + recv(Some(r).unwrap_or(never())) -> _ => {}, + default => {} + } +} + +#[test] +fn case_blocks() { + let (s, r) = unbounded::<i32>(); + + select! { + recv(r) -> _ => 3.0, + recv(r) -> _ => loop { + unreachable!() + }, + recv(r) -> _ => match 7 + 3 { + _ => unreachable!() + }, + default => 7. + }; + + select! { + recv(r) -> msg => if msg.is_ok() { + unreachable!() + }, + default => () + } + + drop(s); +} + +#[allow(clippy::redundant_closure_call)] // This is intentional. +#[test] +fn move_handles() { + let (s, r) = unbounded::<i32>(); + select! { + recv((move || r)()) -> _ => {} + send((move || s)(), 0) -> _ => {} + } +} + +#[test] +fn infer_types() { + let (s, r) = unbounded(); + select! { + recv(r) -> _ => {} + default => {} + } + s.send(()).unwrap(); + + let (s, r) = unbounded(); + select! { + send(s, ()) -> _ => {} + } + r.recv().unwrap(); +} + +#[test] +fn default_syntax() { + let (s, r) = bounded::<i32>(0); + + select! { + recv(r) -> _ => panic!(), + default => {} + } + select! { + send(s, 0) -> _ => panic!(), + default() => {} + } + select! { + default => {} + } + select! { + default() => {} + } +} + +#[test] +fn same_variable_name() { + let (_, r) = unbounded::<i32>(); + select! { + recv(r) -> r => assert!(r.is_err()), + } +} + +#[test] +fn handles_on_heap() { + let (s, r) = unbounded::<i32>(); + let (s, r) = (Box::new(s), Box::new(r)); + + select! { + send(*s, 0) -> _ => {} + recv(*r) -> _ => {} + default => {} + } + + drop(s); + drop(r); +} + +#[test] +fn once_blocks() { + let (s, r) = unbounded::<i32>(); + + let once = Box::new(()); + select! { + send(s, 0) -> _ => drop(once), + } + + let once = Box::new(()); + select! { + recv(r) -> _ => drop(once), + } + + let once1 = Box::new(()); + let once2 = Box::new(()); + select! { + send(s, 0) -> _ => drop(once1), + default => drop(once2), + } + + let once1 = Box::new(()); + let once2 = Box::new(()); + select! { + recv(r) -> _ => drop(once1), + default => drop(once2), + } + + let once1 = Box::new(()); + let once2 = Box::new(()); + select! { + recv(r) -> _ => drop(once1), + send(s, 0) -> _ => drop(once2), + } +} + +#[test] +fn once_receiver() { + let (_, r) = unbounded::<i32>(); + + let once = Box::new(()); + let get = move || { + drop(once); + r + }; + + select! { + recv(get()) -> _ => {} + } +} + +#[test] +fn once_sender() { + let (s, _) = unbounded::<i32>(); + + let once = Box::new(()); + let get = move || { + drop(once); + s + }; + + select! { + send(get(), 5) -> _ => {} + } +} + +#[test] +fn parse_nesting() { + let (_, r) = unbounded::<i32>(); + + select! { + recv(r) -> _ => {} + recv(r) -> _ => { + select! { + recv(r) -> _ => {} + recv(r) -> _ => { + select! { + recv(r) -> _ => {} + recv(r) -> _ => { + select! { + default => {} + } + } + } + } + } + } + } +} + +#[test] +fn evaluate() { + let (s, r) = unbounded::<i32>(); + + let v = select! { + recv(r) -> _ => "foo".into(), + send(s, 0) -> _ => "bar".to_owned(), + default => "baz".to_string(), + }; + assert_eq!(v, "bar"); + + let v = select! { + recv(r) -> _ => "foo".into(), + default => "baz".to_string(), + }; + assert_eq!(v, "foo"); + + let v = select! { + recv(r) -> _ => "foo".into(), + default => "baz".to_string(), + }; + assert_eq!(v, "baz"); +} + +#[test] +fn deref() { + use crossbeam_channel as cc; + + struct Sender<T>(cc::Sender<T>); + struct Receiver<T>(cc::Receiver<T>); + + impl<T> Deref for Receiver<T> { + type Target = cc::Receiver<T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl<T> Deref for Sender<T> { + type Target = cc::Sender<T>; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + let (s, r) = bounded::<i32>(0); + let (s, r) = (Sender(s), Receiver(r)); + + select! { + send(s, 0) -> _ => panic!(), + recv(r) -> _ => panic!(), + default => {} + } +} + +#[test] +fn result_types() { + let (s, _) = bounded::<i32>(0); + let (_, r) = bounded::<i32>(0); + + select! { + recv(r) -> res => drop::<Result<i32, RecvError>>(res), + } + select! { + recv(r) -> res => drop::<Result<i32, RecvError>>(res), + default => {} + } + select! { + recv(r) -> res => drop::<Result<i32, RecvError>>(res), + default(ms(0)) => {} + } + + select! { + send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), + } + select! { + send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), + default => {} + } + select! { + send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), + default(ms(0)) => {} + } + + select! { + send(s, 0) -> res => drop::<Result<(), SendError<i32>>>(res), + recv(r) -> res => drop::<Result<i32, RecvError>>(res), + } +} + +#[test] +fn try_recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + recv(r) -> _ => panic!(), + default => {} + } + thread::sleep(ms(1500)); + select! { + recv(r) -> v => assert_eq!(v, Ok(7)), + default => panic!(), + } + thread::sleep(ms(500)); + select! { + recv(r) -> v => assert_eq!(v, Err(RecvError)), + default => panic!(), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + select! { + send(s, 7) -> res => res.unwrap(), + } + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + recv(r) -> v => assert_eq!(v, Ok(7)), + } + thread::sleep(ms(1000)); + select! { + recv(r) -> v => assert_eq!(v, Ok(8)), + } + thread::sleep(ms(1000)); + select! { + recv(r) -> v => assert_eq!(v, Ok(9)), + } + select! { + recv(r) -> v => assert_eq!(v, Err(RecvError)), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + select! { + send(s, 7) -> res => res.unwrap(), + } + select! { + send(s, 8) -> res => res.unwrap(), + } + select! { + send(s, 9) -> res => res.unwrap(), + } + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + recv(r) -> _ => panic!(), + default(ms(1000)) => {} + } + select! { + recv(r) -> v => assert_eq!(v, Ok(7)), + default(ms(1000)) => panic!(), + } + select! { + recv(r) -> v => assert_eq!(v, Err(RecvError)), + default(ms(1000)) => panic!(), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + select! { + send(s, 7) -> res => res.unwrap(), + } + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + send(s, 7) -> _ => panic!(), + default => {} + } + thread::sleep(ms(1500)); + select! { + send(s, 8) -> res => res.unwrap(), + default => panic!(), + } + thread::sleep(ms(500)); + select! { + send(s, 8) -> res => assert_eq!(res, Err(SendError(8))), + default => panic!(), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + select! { + recv(r) -> v => assert_eq!(v, Ok(8)), + } + }); + }) + .unwrap(); +} + +#[test] +fn send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + send(s, 7) -> res => res.unwrap(), + } + thread::sleep(ms(1000)); + select! { + send(s, 8) -> res => res.unwrap(), + } + thread::sleep(ms(1000)); + select! { + send(s, 9) -> res => res.unwrap(), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + select! { + recv(r) -> v => assert_eq!(v, Ok(7)), + } + select! { + recv(r) -> v => assert_eq!(v, Ok(8)), + } + select! { + recv(r) -> v => assert_eq!(v, Ok(9)), + } + }); + }) + .unwrap(); +} + +#[test] +fn send_timeout() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + send(s, 7) -> _ => panic!(), + default(ms(1000)) => {} + } + select! { + send(s, 8) -> res => res.unwrap(), + default(ms(1000)) => panic!(), + } + select! { + send(s, 9) -> res => assert_eq!(res, Err(SendError(9))), + default(ms(1000)) => panic!(), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + select! { + recv(r) -> v => assert_eq!(v, Ok(8)), + } + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_sender() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + send(s, ()) -> res => assert_eq!(res, Err(SendError(()))), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(r); + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(move |_| { + select! { + recv(r) -> res => assert_eq!(res, Err(RecvError)), + } + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn trailing_comma() { + let (s, r) = unbounded::<usize>(); + + select! { + send(s, 1,) -> _ => {}, + recv(r,) -> _ => {}, + default(ms(1000),) => {}, + } +} diff --git a/third_party/rust/crossbeam-channel/tests/thread_locals.rs b/third_party/rust/crossbeam-channel/tests/thread_locals.rs new file mode 100644 index 0000000000..fb4e577f29 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/thread_locals.rs @@ -0,0 +1,53 @@ +//! Tests that make sure accessing thread-locals while exiting the thread doesn't cause panics. + +#![cfg(not(miri))] // Miri detects that this test is buggy: the destructor of `FOO` uses `std::thread::current()`! + +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{select, unbounded}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +#[cfg_attr(target_os = "macos", ignore = "TLS is destroyed too early on macOS")] +fn use_while_exiting() { + struct Foo; + + impl Drop for Foo { + fn drop(&mut self) { + // A blocking operation after the thread-locals have been dropped. This will attempt to + // use the thread-locals and must not panic. + let (_s, r) = unbounded::<()>(); + select! { + recv(r) -> _ => {} + default(ms(100)) => {} + } + } + } + + thread_local! { + static FOO: Foo = Foo; + } + + let (s, r) = unbounded::<()>(); + + scope(|scope| { + scope.spawn(|_| { + // First initialize `FOO`, then the thread-locals related to crossbeam-channel. + FOO.with(|_| ()); + r.recv().unwrap(); + // At thread exit, thread-locals related to crossbeam-channel get dropped first and + // `FOO` is dropped last. + }); + + scope.spawn(|_| { + thread::sleep(ms(100)); + s.send(()).unwrap(); + }); + }) + .unwrap(); +} diff --git a/third_party/rust/crossbeam-channel/tests/tick.rs b/third_party/rust/crossbeam-channel/tests/tick.rs new file mode 100644 index 0000000000..23bbb1f184 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/tick.rs @@ -0,0 +1,352 @@ +//! Tests for the tick channel flavor. + +#![cfg(not(miri))] // TODO: many assertions failed due to Miri is slow + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::{Duration, Instant}; + +use crossbeam_channel::{after, select, tick, Select, TryRecvError}; +use crossbeam_utils::thread::scope; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn fire() { + let start = Instant::now(); + let r = tick(ms(50)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(100)); + + let fired = r.try_recv().unwrap(); + assert!(start < fired); + assert!(fired - start >= ms(50)); + + let now = Instant::now(); + assert!(fired < now); + assert!(now - fired >= ms(50)); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + + select! { + recv(r) -> _ => panic!(), + default => {} + } + + select! { + recv(r) -> _ => {} + recv(tick(ms(200))) -> _ => panic!(), + } +} + +#[test] +fn intervals() { + let start = Instant::now(); + let r = tick(ms(50)); + + let t1 = r.recv().unwrap(); + assert!(start + ms(50) <= t1); + assert!(start + ms(100) > t1); + + thread::sleep(ms(300)); + let t2 = r.try_recv().unwrap(); + assert!(start + ms(100) <= t2); + assert!(start + ms(150) > t2); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + let t3 = r.recv().unwrap(); + assert!(start + ms(400) <= t3); + assert!(start + ms(450) > t3); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn capacity() { + const COUNT: usize = 10; + + for i in 0..COUNT { + let r = tick(ms(i as u64)); + assert_eq!(r.capacity(), Some(1)); + } +} + +#[test] +fn len_empty_full() { + let r = tick(ms(50)); + + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); + + thread::sleep(ms(100)); + + assert_eq!(r.len(), 1); + assert!(!r.is_empty()); + assert!(r.is_full()); + + r.try_recv().unwrap(); + + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(!r.is_full()); +} + +#[test] +fn try_recv() { + let r = tick(ms(200)); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(100)); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(200)); + assert!(r.try_recv().is_ok()); + assert!(r.try_recv().is_err()); + + thread::sleep(ms(200)); + assert!(r.try_recv().is_ok()); + assert!(r.try_recv().is_err()); +} + +#[test] +fn recv() { + let start = Instant::now(); + let r = tick(ms(50)); + + let fired = r.recv().unwrap(); + assert!(start < fired); + assert!(fired - start >= ms(50)); + + let now = Instant::now(); + assert!(fired < now); + assert!(now - fired < fired - start); + + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow +#[test] +fn recv_timeout() { + let start = Instant::now(); + let r = tick(ms(200)); + + assert!(r.recv_timeout(ms(100)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(100)); + assert!(now - start <= ms(150)); + + let fired = r.recv_timeout(ms(200)).unwrap(); + assert!(fired - start >= ms(200)); + assert!(fired - start <= ms(250)); + + assert!(r.recv_timeout(ms(100)).is_err()); + let now = Instant::now(); + assert!(now - start >= ms(300)); + assert!(now - start <= ms(350)); + + let fired = r.recv_timeout(ms(200)).unwrap(); + assert!(fired - start >= ms(400)); + assert!(fired - start <= ms(450)); +} + +#[test] +fn recv_two() { + let r1 = tick(ms(50)); + let r2 = tick(ms(50)); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..10 { + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + } + } + }); + scope.spawn(|_| { + for _ in 0..10 { + select! { + recv(r1) -> _ => {} + recv(r2) -> _ => {} + } + } + }); + }) + .unwrap(); +} + +#[test] +fn recv_race() { + select! { + recv(tick(ms(50))) -> _ => {} + recv(tick(ms(100))) -> _ => panic!(), + } + + select! { + recv(tick(ms(100))) -> _ => panic!(), + recv(tick(ms(50))) -> _ => {} + } +} + +#[test] +fn stress_default() { + const COUNT: usize = 10; + + for _ in 0..COUNT { + select! { + recv(tick(ms(0))) -> _ => {} + default => panic!(), + } + } + + for _ in 0..COUNT { + select! { + recv(tick(ms(100))) -> _ => panic!(), + default => {} + } + } +} + +#[test] +fn select() { + const THREADS: usize = 4; + + let hits = AtomicUsize::new(0); + let r1 = tick(ms(200)); + let r2 = tick(ms(300)); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + let timeout = after(ms(1100)); + loop { + let mut sel = Select::new(); + let oper1 = sel.recv(&r1); + let oper2 = sel.recv(&r2); + let oper3 = sel.recv(&timeout); + let oper = sel.select(); + match oper.index() { + i if i == oper1 => { + oper.recv(&r1).unwrap(); + hits.fetch_add(1, Ordering::SeqCst); + } + i if i == oper2 => { + oper.recv(&r2).unwrap(); + hits.fetch_add(1, Ordering::SeqCst); + } + i if i == oper3 => { + oper.recv(&timeout).unwrap(); + break; + } + _ => unreachable!(), + } + } + }); + } + }) + .unwrap(); + + assert_eq!(hits.load(Ordering::SeqCst), 8); +} + +#[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to tsan is slow +#[test] +fn ready() { + const THREADS: usize = 4; + + let hits = AtomicUsize::new(0); + let r1 = tick(ms(200)); + let r2 = tick(ms(300)); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + let timeout = after(ms(1100)); + 'outer: loop { + let mut sel = Select::new(); + sel.recv(&r1); + sel.recv(&r2); + sel.recv(&timeout); + loop { + match sel.ready() { + 0 => { + if r1.try_recv().is_ok() { + hits.fetch_add(1, Ordering::SeqCst); + break; + } + } + 1 => { + if r2.try_recv().is_ok() { + hits.fetch_add(1, Ordering::SeqCst); + break; + } + } + 2 => { + if timeout.try_recv().is_ok() { + break 'outer; + } + } + _ => unreachable!(), + } + } + } + }); + } + }) + .unwrap(); + + assert_eq!(hits.load(Ordering::SeqCst), 8); +} + +#[test] +fn fairness() { + const COUNT: usize = 30; + + for &dur in &[0, 1] { + let mut hits = [0usize; 2]; + + for _ in 0..COUNT { + let r1 = tick(ms(dur)); + let r2 = tick(ms(dur)); + + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + } + + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + } +} + +#[test] +fn fairness_duplicates() { + const COUNT: usize = 30; + + for &dur in &[0, 1] { + let mut hits = [0usize; 5]; + + for _ in 0..COUNT { + let r = tick(ms(dur)); + + for _ in 0..COUNT { + select! { + recv(r) -> _ => hits[0] += 1, + recv(r) -> _ => hits[1] += 1, + recv(r) -> _ => hits[2] += 1, + recv(r) -> _ => hits[3] += 1, + recv(r) -> _ => hits[4] += 1, + } + } + } + + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + } +} diff --git a/third_party/rust/crossbeam-channel/tests/zero.rs b/third_party/rust/crossbeam-channel/tests/zero.rs new file mode 100644 index 0000000000..74c9a3e102 --- /dev/null +++ b/third_party/rust/crossbeam-channel/tests/zero.rs @@ -0,0 +1,587 @@ +//! Tests for the zero channel flavor. + +use std::any::Any; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{bounded, select, Receiver}; +use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError}; +use crossbeam_channel::{SendError, SendTimeoutError, TrySendError}; +use crossbeam_utils::thread::scope; +use rand::{thread_rng, Rng}; + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn smoke() { + let (s, r) = bounded(0); + assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); +} + +#[test] +fn capacity() { + let (s, r) = bounded::<()>(0); + assert_eq!(s.capacity(), Some(0)); + assert_eq!(r.capacity(), Some(0)); +} + +#[test] +fn len_empty_full() { + let (s, r) = bounded(0); + + assert_eq!(s.len(), 0); + assert!(s.is_empty()); + assert!(s.is_full()); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(r.is_full()); + + scope(|scope| { + scope.spawn(|_| s.send(0).unwrap()); + scope.spawn(|_| r.recv().unwrap()); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert!(s.is_empty()); + assert!(s.is_full()); + assert_eq!(r.len(), 0); + assert!(r.is_empty()); + assert!(r.is_full()); +} + +#[test] +fn try_recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); + thread::sleep(ms(1500)); + assert_eq!(r.try_recv(), Ok(7)); + thread::sleep(ms(500)); + assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Ok(7)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(9)); + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + s.send(8).unwrap(); + s.send(9).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn recv_timeout() { + let (s, r) = bounded::<i32>(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout)); + assert_eq!(r.recv_timeout(ms(1000)), Ok(7)); + assert_eq!( + r.recv_timeout(ms(1000)), + Err(RecvTimeoutError::Disconnected) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + s.send(7).unwrap(); + }); + }) + .unwrap(); +} + +#[test] +fn try_send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.try_send(7), Err(TrySendError::Full(7))); + thread::sleep(ms(1500)); + assert_eq!(s.try_send(8), Ok(())); + thread::sleep(ms(500)); + assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + assert_eq!(r.recv(), Ok(8)); + }); + }) + .unwrap(); +} + +#[test] +fn send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + s.send(7).unwrap(); + thread::sleep(ms(1000)); + s.send(8).unwrap(); + thread::sleep(ms(1000)); + s.send(9).unwrap(); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(7)); + assert_eq!(r.recv(), Ok(8)); + assert_eq!(r.recv(), Ok(9)); + }); + }) + .unwrap(); +} + +#[test] +fn send_timeout() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!( + s.send_timeout(7, ms(1000)), + Err(SendTimeoutError::Timeout(7)) + ); + assert_eq!(s.send_timeout(8, ms(1000)), Ok(())); + assert_eq!( + s.send_timeout(9, ms(1000)), + Err(SendTimeoutError::Disconnected(9)) + ); + }); + scope.spawn(move |_| { + thread::sleep(ms(1500)); + assert_eq!(r.recv(), Ok(8)); + }); + }) + .unwrap(); +} + +#[test] +fn len() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + + let (s, r) = bounded(0); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + assert_eq!(r.len(), 0); + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + assert_eq!(s.len(), 0); + } + }); + }) + .unwrap(); + + assert_eq!(s.len(), 0); + assert_eq!(r.len(), 0); +} + +#[test] +fn disconnect_wakes_sender() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(s.send(()), Err(SendError(()))); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(r); + }); + }) + .unwrap(); +} + +#[test] +fn disconnect_wakes_receiver() { + let (s, r) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(move |_| { + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + thread::sleep(ms(1000)); + drop(s); + }); + }) + .unwrap(); +} + +#[test] +fn spsc() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 100_000; + + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + for i in 0..COUNT { + assert_eq!(r.recv(), Ok(i)); + } + assert_eq!(r.recv(), Err(RecvError)); + }); + scope.spawn(move |_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + }) + .unwrap(); +} + +#[test] +fn mpmc() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 25_000; + const THREADS: usize = 4; + + let (s, r) = bounded::<usize>(0); + let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>(); + + scope(|scope| { + for _ in 0..THREADS { + scope.spawn(|_| { + for _ in 0..COUNT { + let n = r.recv().unwrap(); + v[n].fetch_add(1, Ordering::SeqCst); + } + }); + } + for _ in 0..THREADS { + scope.spawn(|_| { + for i in 0..COUNT { + s.send(i).unwrap(); + } + }); + } + }) + .unwrap(); + + for c in v { + assert_eq!(c.load(Ordering::SeqCst), THREADS); + } +} + +#[test] +fn stress_oneshot() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + for _ in 0..COUNT { + let (s, r) = bounded(1); + + scope(|scope| { + scope.spawn(|_| r.recv().unwrap()); + scope.spawn(|_| s.send(0).unwrap()); + }) + .unwrap(); + } +} + +#[test] +fn stress_iter() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + let (request_s, request_r) = bounded(0); + let (response_s, response_r) = bounded(0); + + scope(|scope| { + scope.spawn(move |_| { + let mut count = 0; + loop { + for x in response_r.try_iter() { + count += x; + if count == COUNT { + return; + } + } + let _ = request_s.try_send(()); + } + }); + + for _ in request_r.iter() { + if response_s.send(1).is_err() { + break; + } + } + }) + .unwrap(); +} + +#[test] +fn stress_timeout_two_threads() { + const COUNT: usize = 100; + + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(()) = s.send_timeout(i, ms(10)) { + break; + } + } + } + }); + + scope.spawn(|_| { + for i in 0..COUNT { + if i % 2 == 0 { + thread::sleep(ms(50)); + } + loop { + if let Ok(x) = r.recv_timeout(ms(10)) { + assert_eq!(x, i); + break; + } + } + } + }); + }) + .unwrap(); +} + +#[test] +fn drops() { + #[cfg(miri)] + const RUNS: usize = 20; + #[cfg(not(miri))] + const RUNS: usize = 100; + #[cfg(miri)] + const STEPS: usize = 100; + #[cfg(not(miri))] + const STEPS: usize = 10_000; + + static DROPS: AtomicUsize = AtomicUsize::new(0); + + #[derive(Debug, PartialEq)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + let mut rng = thread_rng(); + + for _ in 0..RUNS { + let steps = rng.gen_range(0..STEPS); + + DROPS.store(0, Ordering::SeqCst); + let (s, r) = bounded::<DropCounter>(0); + + scope(|scope| { + scope.spawn(|_| { + for _ in 0..steps { + r.recv().unwrap(); + } + }); + + scope.spawn(|_| { + for _ in 0..steps { + s.send(DropCounter).unwrap(); + } + }); + }) + .unwrap(); + + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + drop(s); + drop(r); + assert_eq!(DROPS.load(Ordering::SeqCst), steps); + } +} + +#[test] +fn fairness() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s1, r1) = bounded::<()>(0); + let (s2, r2) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select! { + recv(r1) -> _ => hits[0] += 1, + recv(r2) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + }); + + let mut hits = [0usize; 2]; + for _ in 0..COUNT { + select! { + send(s1, ()) -> _ => hits[0] += 1, + send(s2, ()) -> _ => hits[1] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + }) + .unwrap(); +} + +#[test] +fn fairness_duplicates() { + #[cfg(miri)] + const COUNT: usize = 100; + #[cfg(not(miri))] + const COUNT: usize = 10_000; + + let (s, r) = bounded::<()>(0); + + scope(|scope| { + scope.spawn(|_| { + let mut hits = [0usize; 5]; + for _ in 0..COUNT { + select! { + recv(r) -> _ => hits[0] += 1, + recv(r) -> _ => hits[1] += 1, + recv(r) -> _ => hits[2] += 1, + recv(r) -> _ => hits[3] += 1, + recv(r) -> _ => hits[4] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + }); + + let mut hits = [0usize; 5]; + for _ in 0..COUNT { + select! { + send(s, ()) -> _ => hits[0] += 1, + send(s, ()) -> _ => hits[1] += 1, + send(s, ()) -> _ => hits[2] += 1, + send(s, ()) -> _ => hits[3] += 1, + send(s, ()) -> _ => hits[4] += 1, + } + } + assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2)); + }) + .unwrap(); +} + +#[test] +fn recv_in_send() { + let (s, r) = bounded(0); + + scope(|scope| { + scope.spawn(|_| { + thread::sleep(ms(100)); + r.recv() + }); + + scope.spawn(|_| { + thread::sleep(ms(500)); + s.send(()).unwrap(); + }); + + select! { + send(s, r.recv().unwrap()) -> _ => {} + } + }) + .unwrap(); +} + +#[test] +fn channel_through_channel() { + #[cfg(miri)] + const COUNT: usize = 50; + #[cfg(not(miri))] + const COUNT: usize = 1000; + + type T = Box<dyn Any + Send>; + + let (s, r) = bounded::<T>(0); + + scope(|scope| { + scope.spawn(move |_| { + let mut s = s; + + for _ in 0..COUNT { + let (new_s, new_r) = bounded(0); + let new_r: T = Box::new(Some(new_r)); + + s.send(new_r).unwrap(); + s = new_s; + } + }); + + scope.spawn(move |_| { + let mut r = r; + + for _ in 0..COUNT { + r = r + .recv() + .unwrap() + .downcast_mut::<Option<Receiver<T>>>() + .unwrap() + .take() + .unwrap() + } + }); + }) + .unwrap(); +} |