diff options
Diffstat (limited to 'tests/ui/threads-sendsync/mpsc_stress.rs')
-rw-r--r-- | tests/ui/threads-sendsync/mpsc_stress.rs | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/tests/ui/threads-sendsync/mpsc_stress.rs b/tests/ui/threads-sendsync/mpsc_stress.rs new file mode 100644 index 000000000..c2e1912de --- /dev/null +++ b/tests/ui/threads-sendsync/mpsc_stress.rs @@ -0,0 +1,200 @@ +// run-pass +// compile-flags:--test +// ignore-emscripten + +use std::sync::mpsc::channel; +use std::sync::mpsc::TryRecvError; +use std::sync::mpsc::RecvError; +use std::sync::mpsc::RecvTimeoutError; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + +use std::thread; +use std::time::Duration; + + +/// Simple thread synchronization utility +struct Barrier { + // Not using mutex/condvar for precision + shared: Arc<AtomicUsize>, + count: usize, +} + +impl Barrier { + fn new(count: usize) -> Vec<Barrier> { + let shared = Arc::new(AtomicUsize::new(0)); + (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect() + } + + fn new2() -> (Barrier, Barrier) { + let mut v = Barrier::new(2); + (v.pop().unwrap(), v.pop().unwrap()) + } + + /// Returns when `count` threads enter `wait` + fn wait(self) { + self.shared.fetch_add(1, Ordering::SeqCst); + while self.shared.load(Ordering::SeqCst) != self.count { + #[cfg(target_env = "sgx")] + thread::yield_now(); + } + } +} + + +fn shared_close_sender_does_not_lose_messages_iter() { + let (tb, rb) = Barrier::new2(); + + let (tx, rx) = channel(); + let _ = tx.clone(); // convert to shared + + thread::spawn(move || { + tb.wait(); + thread::sleep(Duration::from_micros(1)); + tx.send(17).expect("send"); + drop(tx); + }); + + let i = rx.into_iter(); + rb.wait(); + // Make sure it doesn't return disconnected before returning an element + assert_eq!(vec![17], i.collect::<Vec<_>>()); +} + +#[test] +fn shared_close_sender_does_not_lose_messages() { + with_minimum_timer_resolution(|| { + for _ in 0..10000 { + shared_close_sender_does_not_lose_messages_iter(); + } + }); +} + + +// https://github.com/rust-lang/rust/issues/39364 +fn concurrent_recv_timeout_and_upgrade_iter() { + // 1 us + let sleep = Duration::new(0, 1_000); + + let (a, b) = Barrier::new2(); + let (tx, rx) = channel(); + let th = thread::spawn(move || { + a.wait(); + loop { + match rx.recv_timeout(sleep) { + Ok(_) => { + break; + }, + Err(_) => {}, + } + } + }); + b.wait(); + thread::sleep(sleep); + tx.clone().send(()).expect("send"); + th.join().unwrap(); +} + +#[test] +fn concurrent_recv_timeout_and_upgrade() { + with_minimum_timer_resolution(|| { + for _ in 0..10000 { + concurrent_recv_timeout_and_upgrade_iter(); + } + }); +} + + +fn concurrent_writes_iter() { + const THREADS: usize = 4; + const PER_THR: usize = 100; + + let mut bs = Barrier::new(THREADS + 1); + let (tx, rx) = channel(); + + let mut threads = Vec::new(); + for j in 0..THREADS { + let tx = tx.clone(); + let b = bs.pop().unwrap(); + threads.push(thread::spawn(move || { + b.wait(); + for i in 0..PER_THR { + tx.send(j * 1000 + i).expect("send"); + } + })); + } + + let b = bs.pop().unwrap(); + b.wait(); + + let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect(); + v.sort(); + + for j in 0..THREADS { + for i in 0..PER_THR { + assert_eq!(j * 1000 + i, v[j * PER_THR + i]); + } + } + + for t in threads { + t.join().unwrap(); + } + + let one_us = Duration::new(0, 1000); + + assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err()); + assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err()); + + drop(tx); + + assert_eq!(RecvError, rx.recv().unwrap_err()); + assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err()); + assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err()); +} + +#[test] +fn concurrent_writes() { + with_minimum_timer_resolution(|| { + for _ in 0..100 { + concurrent_writes_iter(); + } + }); +} + +#[cfg(windows)] +pub mod timeapi { + #![allow(non_snake_case)] + use std::ffi::c_uint; + + pub const TIMERR_NOERROR: c_uint = 0; + + #[link(name = "winmm")] + extern "system" { + pub fn timeBeginPeriod(uPeriod: c_uint) -> c_uint; + pub fn timeEndPeriod(uPeriod: c_uint) -> c_uint; + } +} + +/// Window's minimum sleep time can be as much as 16ms. +// This function evaluates the closure with this resolution +// set as low as possible. +/// +/// This takes the above test's duration from 10000*16/1000/60=2.67 minutes to ~16 seconds. +fn with_minimum_timer_resolution(f: impl Fn()) { + #[cfg(windows)] + unsafe { + let ret = timeapi::timeBeginPeriod(1); + assert_eq!(ret, timeapi::TIMERR_NOERROR); + + f(); + + let ret = timeapi::timeEndPeriod(1); + assert_eq!(ret, timeapi::TIMERR_NOERROR); + } + + #[cfg(not(windows))] + { + f(); + } +} |