use std::sync::atomic::Ordering::SeqCst; use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::{Arc, Mutex}; use crossbeam_deque::Steal::{Empty, Success}; use crossbeam_deque::{Injector, Worker}; use crossbeam_utils::thread::scope; use rand::Rng; #[test] fn smoke() { let q = Injector::new(); assert_eq!(q.steal(), Empty); q.push(1); q.push(2); assert_eq!(q.steal(), Success(1)); assert_eq!(q.steal(), Success(2)); assert_eq!(q.steal(), Empty); q.push(3); assert_eq!(q.steal(), Success(3)); assert_eq!(q.steal(), Empty); } #[test] fn is_empty() { let q = Injector::new(); assert!(q.is_empty()); q.push(1); assert!(!q.is_empty()); q.push(2); assert!(!q.is_empty()); let _ = q.steal(); assert!(!q.is_empty()); let _ = q.steal(); assert!(q.is_empty()); q.push(3); assert!(!q.is_empty()); let _ = q.steal(); assert!(q.is_empty()); } #[test] fn spsc() { #[cfg(miri)] const COUNT: usize = 500; #[cfg(not(miri))] const COUNT: usize = 100_000; let q = Injector::new(); scope(|scope| { scope.spawn(|_| { for i in 0..COUNT { loop { if let Success(v) = q.steal() { assert_eq!(i, v); break; } #[cfg(miri)] std::hint::spin_loop(); } } assert_eq!(q.steal(), Empty); }); for i in 0..COUNT { q.push(i); } }) .unwrap(); } #[test] fn mpmc() { #[cfg(miri)] const COUNT: usize = 500; #[cfg(not(miri))] const COUNT: usize = 25_000; const THREADS: usize = 4; let q = Injector::new(); let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::>(); scope(|scope| { for _ in 0..THREADS { scope.spawn(|_| { for i in 0..COUNT { q.push(i); } }); } for _ in 0..THREADS { scope.spawn(|_| { for _ in 0..COUNT { loop { if let Success(n) = q.steal() { v[n].fetch_add(1, SeqCst); break; } #[cfg(miri)] std::hint::spin_loop(); } } }); } }) .unwrap(); for c in v { assert_eq!(c.load(SeqCst), THREADS); } } #[test] fn stampede() { const THREADS: usize = 8; #[cfg(miri)] const COUNT: usize = 500; #[cfg(not(miri))] const COUNT: usize = 50_000; let q = Injector::new(); for i in 0..COUNT { q.push(Box::new(i + 1)); } let remaining = Arc::new(AtomicUsize::new(COUNT)); scope(|scope| { for _ in 0..THREADS { let remaining = remaining.clone(); let q = &q; scope.spawn(move |_| { let mut last = 0; while remaining.load(SeqCst) > 0 { if let Success(x) = q.steal() { assert!(last < *x); last = *x; remaining.fetch_sub(1, SeqCst); } } }); } let mut last = 0; while remaining.load(SeqCst) > 0 { if let Success(x) = q.steal() { assert!(last < *x); last = *x; remaining.fetch_sub(1, SeqCst); } } }) .unwrap(); } #[test] fn stress() { const THREADS: usize = 8; #[cfg(miri)] const COUNT: usize = 500; #[cfg(not(miri))] const COUNT: usize = 50_000; let q = Injector::new(); let done = Arc::new(AtomicBool::new(false)); let hits = Arc::new(AtomicUsize::new(0)); scope(|scope| { for _ in 0..THREADS { let done = done.clone(); let hits = hits.clone(); let q = &q; scope.spawn(move |_| { let w2 = Worker::new_fifo(); while !done.load(SeqCst) { if let Success(_) = q.steal() { hits.fetch_add(1, SeqCst); } let _ = q.steal_batch(&w2); if let Success(_) = q.steal_batch_and_pop(&w2) { hits.fetch_add(1, SeqCst); } while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } }); } let mut rng = rand::thread_rng(); let mut expected = 0; while expected < COUNT { if rng.gen_range(0..3) == 0 { while let Success(_) = q.steal() { hits.fetch_add(1, SeqCst); } } else { q.push(expected); expected += 1; } } while hits.load(SeqCst) < COUNT { while let Success(_) = q.steal() { hits.fetch_add(1, SeqCst); } } done.store(true, SeqCst); }) .unwrap(); } #[cfg_attr(miri, ignore)] // Miri is too slow #[test] fn no_starvation() { const THREADS: usize = 8; const COUNT: usize = 50_000; let q = Injector::new(); let done = Arc::new(AtomicBool::new(false)); let mut all_hits = Vec::new(); scope(|scope| { for _ in 0..THREADS { let done = done.clone(); let hits = Arc::new(AtomicUsize::new(0)); all_hits.push(hits.clone()); let q = &q; scope.spawn(move |_| { let w2 = Worker::new_fifo(); while !done.load(SeqCst) { if let Success(_) = q.steal() { hits.fetch_add(1, SeqCst); } let _ = q.steal_batch(&w2); if let Success(_) = q.steal_batch_and_pop(&w2) { hits.fetch_add(1, SeqCst); } while w2.pop().is_some() { hits.fetch_add(1, SeqCst); } } }); } let mut rng = rand::thread_rng(); let mut my_hits = 0; loop { for i in 0..rng.gen_range(0..COUNT) { if rng.gen_range(0..3) == 0 && my_hits == 0 { while let Success(_) = q.steal() { my_hits += 1; } } else { q.push(i); } } if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) { break; } } done.store(true, SeqCst); }) .unwrap(); } #[test] fn destructors() { #[cfg(miri)] const THREADS: usize = 2; #[cfg(not(miri))] const THREADS: usize = 8; #[cfg(miri)] const COUNT: usize = 500; #[cfg(not(miri))] const COUNT: usize = 50_000; #[cfg(miri)] const STEPS: usize = 100; #[cfg(not(miri))] const STEPS: usize = 1000; struct Elem(usize, Arc>>); impl Drop for Elem { fn drop(&mut self) { self.1.lock().unwrap().push(self.0); } } let q = Injector::new(); let dropped = Arc::new(Mutex::new(Vec::new())); let remaining = Arc::new(AtomicUsize::new(COUNT)); for i in 0..COUNT { q.push(Elem(i, dropped.clone())); } scope(|scope| { for _ in 0..THREADS { let remaining = remaining.clone(); let q = &q; scope.spawn(move |_| { let w2 = Worker::new_fifo(); let mut cnt = 0; while cnt < STEPS { if let Success(_) = q.steal() { cnt += 1; remaining.fetch_sub(1, SeqCst); } let _ = q.steal_batch(&w2); if let Success(_) = q.steal_batch_and_pop(&w2) { cnt += 1; remaining.fetch_sub(1, SeqCst); } while w2.pop().is_some() { cnt += 1; remaining.fetch_sub(1, SeqCst); } } }); } for _ in 0..STEPS { if let Success(_) = q.steal() { remaining.fetch_sub(1, SeqCst); } } }) .unwrap(); let rem = remaining.load(SeqCst); assert!(rem > 0); { let mut v = dropped.lock().unwrap(); assert_eq!(v.len(), COUNT - rem); v.clear(); } drop(q); { let mut v = dropped.lock().unwrap(); assert_eq!(v.len(), rem); v.sort_unstable(); for pair in v.windows(2) { assert_eq!(pair[0] + 1, pair[1]); } } }