summaryrefslogtreecommitdiffstats
path: root/tests/ui/threads-sendsync/mpsc_stress.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tests/ui/threads-sendsync/mpsc_stress.rs')
-rw-r--r--tests/ui/threads-sendsync/mpsc_stress.rs200
1 files changed, 200 insertions, 0 deletions
diff --git a/tests/ui/threads-sendsync/mpsc_stress.rs b/tests/ui/threads-sendsync/mpsc_stress.rs
new file mode 100644
index 000000000..c2e1912de
--- /dev/null
+++ b/tests/ui/threads-sendsync/mpsc_stress.rs
@@ -0,0 +1,200 @@
+// run-pass
+// compile-flags:--test
+// ignore-emscripten
+
+use std::sync::mpsc::channel;
+use std::sync::mpsc::TryRecvError;
+use std::sync::mpsc::RecvError;
+use std::sync::mpsc::RecvTimeoutError;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+
+use std::thread;
+use std::time::Duration;
+
+
+/// Simple thread synchronization utility
+struct Barrier {
+ // Not using mutex/condvar for precision
+ shared: Arc<AtomicUsize>,
+ count: usize,
+}
+
+impl Barrier {
+ fn new(count: usize) -> Vec<Barrier> {
+ let shared = Arc::new(AtomicUsize::new(0));
+ (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
+ }
+
+ fn new2() -> (Barrier, Barrier) {
+ let mut v = Barrier::new(2);
+ (v.pop().unwrap(), v.pop().unwrap())
+ }
+
+ /// Returns when `count` threads enter `wait`
+ fn wait(self) {
+ self.shared.fetch_add(1, Ordering::SeqCst);
+ while self.shared.load(Ordering::SeqCst) != self.count {
+ #[cfg(target_env = "sgx")]
+ thread::yield_now();
+ }
+ }
+}
+
+
+fn shared_close_sender_does_not_lose_messages_iter() {
+ let (tb, rb) = Barrier::new2();
+
+ let (tx, rx) = channel();
+ let _ = tx.clone(); // convert to shared
+
+ thread::spawn(move || {
+ tb.wait();
+ thread::sleep(Duration::from_micros(1));
+ tx.send(17).expect("send");
+ drop(tx);
+ });
+
+ let i = rx.into_iter();
+ rb.wait();
+ // Make sure it doesn't return disconnected before returning an element
+ assert_eq!(vec![17], i.collect::<Vec<_>>());
+}
+
+#[test]
+fn shared_close_sender_does_not_lose_messages() {
+ with_minimum_timer_resolution(|| {
+ for _ in 0..10000 {
+ shared_close_sender_does_not_lose_messages_iter();
+ }
+ });
+}
+
+
+// https://github.com/rust-lang/rust/issues/39364
+fn concurrent_recv_timeout_and_upgrade_iter() {
+ // 1 us
+ let sleep = Duration::new(0, 1_000);
+
+ let (a, b) = Barrier::new2();
+ let (tx, rx) = channel();
+ let th = thread::spawn(move || {
+ a.wait();
+ loop {
+ match rx.recv_timeout(sleep) {
+ Ok(_) => {
+ break;
+ },
+ Err(_) => {},
+ }
+ }
+ });
+ b.wait();
+ thread::sleep(sleep);
+ tx.clone().send(()).expect("send");
+ th.join().unwrap();
+}
+
+#[test]
+fn concurrent_recv_timeout_and_upgrade() {
+ with_minimum_timer_resolution(|| {
+ for _ in 0..10000 {
+ concurrent_recv_timeout_and_upgrade_iter();
+ }
+ });
+}
+
+
+fn concurrent_writes_iter() {
+ const THREADS: usize = 4;
+ const PER_THR: usize = 100;
+
+ let mut bs = Barrier::new(THREADS + 1);
+ let (tx, rx) = channel();
+
+ let mut threads = Vec::new();
+ for j in 0..THREADS {
+ let tx = tx.clone();
+ let b = bs.pop().unwrap();
+ threads.push(thread::spawn(move || {
+ b.wait();
+ for i in 0..PER_THR {
+ tx.send(j * 1000 + i).expect("send");
+ }
+ }));
+ }
+
+ let b = bs.pop().unwrap();
+ b.wait();
+
+ let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
+ v.sort();
+
+ for j in 0..THREADS {
+ for i in 0..PER_THR {
+ assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
+ }
+ }
+
+ for t in threads {
+ t.join().unwrap();
+ }
+
+ let one_us = Duration::new(0, 1000);
+
+ assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
+ assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
+
+ drop(tx);
+
+ assert_eq!(RecvError, rx.recv().unwrap_err());
+ assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
+ assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
+}
+
+#[test]
+fn concurrent_writes() {
+ with_minimum_timer_resolution(|| {
+ for _ in 0..100 {
+ concurrent_writes_iter();
+ }
+ });
+}
+
+#[cfg(windows)]
+pub mod timeapi {
+ #![allow(non_snake_case)]
+ use std::ffi::c_uint;
+
+ pub const TIMERR_NOERROR: c_uint = 0;
+
+ #[link(name = "winmm")]
+ extern "system" {
+ pub fn timeBeginPeriod(uPeriod: c_uint) -> c_uint;
+ pub fn timeEndPeriod(uPeriod: c_uint) -> c_uint;
+ }
+}
+
+/// Window's minimum sleep time can be as much as 16ms.
+// This function evaluates the closure with this resolution
+// set as low as possible.
+///
+/// This takes the above test's duration from 10000*16/1000/60=2.67 minutes to ~16 seconds.
+fn with_minimum_timer_resolution(f: impl Fn()) {
+ #[cfg(windows)]
+ unsafe {
+ let ret = timeapi::timeBeginPeriod(1);
+ assert_eq!(ret, timeapi::TIMERR_NOERROR);
+
+ f();
+
+ let ret = timeapi::timeEndPeriod(1);
+ assert_eq!(ret, timeapi::TIMERR_NOERROR);
+ }
+
+ #[cfg(not(windows))]
+ {
+ f();
+ }
+}