summaryrefslogtreecommitdiffstats
path: root/src/test/ui/threads-sendsync/mpsc_stress.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/ui/threads-sendsync/mpsc_stress.rs')
-rw-r--r--src/test/ui/threads-sendsync/mpsc_stress.rs165
1 files changed, 165 insertions, 0 deletions
diff --git a/src/test/ui/threads-sendsync/mpsc_stress.rs b/src/test/ui/threads-sendsync/mpsc_stress.rs
new file mode 100644
index 000000000..a889542fe
--- /dev/null
+++ b/src/test/ui/threads-sendsync/mpsc_stress.rs
@@ -0,0 +1,165 @@
+// run-pass
+// compile-flags:--test
+// ignore-emscripten
+
+use std::sync::mpsc::channel;
+use std::sync::mpsc::TryRecvError;
+use std::sync::mpsc::RecvError;
+use std::sync::mpsc::RecvTimeoutError;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+
+use std::thread;
+use std::time::Duration;
+
+
+/// Simple thread synchronization utility
+struct Barrier {
+ // Not using mutex/condvar for precision
+ shared: Arc<AtomicUsize>,
+ count: usize,
+}
+
+impl Barrier {
+ fn new(count: usize) -> Vec<Barrier> {
+ let shared = Arc::new(AtomicUsize::new(0));
+ (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
+ }
+
+ fn new2() -> (Barrier, Barrier) {
+ let mut v = Barrier::new(2);
+ (v.pop().unwrap(), v.pop().unwrap())
+ }
+
+ /// Returns when `count` threads enter `wait`
+ fn wait(self) {
+ self.shared.fetch_add(1, Ordering::SeqCst);
+ while self.shared.load(Ordering::SeqCst) != self.count {
+ #[cfg(target_env = "sgx")]
+ thread::yield_now();
+ }
+ }
+}
+
+
+fn shared_close_sender_does_not_lose_messages_iter() {
+ let (tb, rb) = Barrier::new2();
+
+ let (tx, rx) = channel();
+ let _ = tx.clone(); // convert to shared
+
+ thread::spawn(move || {
+ tb.wait();
+ thread::sleep(Duration::from_micros(1));
+ tx.send(17).expect("send");
+ drop(tx);
+ });
+
+ let i = rx.into_iter();
+ rb.wait();
+ // Make sure it doesn't return disconnected before returning an element
+ assert_eq!(vec![17], i.collect::<Vec<_>>());
+}
+
+#[test]
+fn shared_close_sender_does_not_lose_messages() {
+ for _ in 0..10000 {
+ shared_close_sender_does_not_lose_messages_iter();
+ }
+}
+
+
+// https://github.com/rust-lang/rust/issues/39364
+fn concurrent_recv_timeout_and_upgrade_iter() {
+ // 1 us
+ let sleep = Duration::new(0, 1_000);
+
+ let (a, b) = Barrier::new2();
+ let (tx, rx) = channel();
+ let th = thread::spawn(move || {
+ a.wait();
+ loop {
+ match rx.recv_timeout(sleep) {
+ Ok(_) => {
+ break;
+ },
+ Err(_) => {},
+ }
+ }
+ });
+ b.wait();
+ thread::sleep(sleep);
+ tx.clone().send(()).expect("send");
+ th.join().unwrap();
+}
+
+#[test]
+fn concurrent_recv_timeout_and_upgrade() {
+ // FIXME: fix and enable
+ if true { return }
+
+ // at the moment of writing this test fails like this:
+ // thread '<unnamed>' panicked at 'assertion failed: `(left == right)`
+ // left: `4561387584`,
+ // right: `0`', libstd/sync/mpsc/shared.rs:253:13
+
+ for _ in 0..10000 {
+ concurrent_recv_timeout_and_upgrade_iter();
+ }
+}
+
+
+fn concurrent_writes_iter() {
+ const THREADS: usize = 4;
+ const PER_THR: usize = 100;
+
+ let mut bs = Barrier::new(THREADS + 1);
+ let (tx, rx) = channel();
+
+ let mut threads = Vec::new();
+ for j in 0..THREADS {
+ let tx = tx.clone();
+ let b = bs.pop().unwrap();
+ threads.push(thread::spawn(move || {
+ b.wait();
+ for i in 0..PER_THR {
+ tx.send(j * 1000 + i).expect("send");
+ }
+ }));
+ }
+
+ let b = bs.pop().unwrap();
+ b.wait();
+
+ let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
+ v.sort();
+
+ for j in 0..THREADS {
+ for i in 0..PER_THR {
+ assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
+ }
+ }
+
+ for t in threads {
+ t.join().unwrap();
+ }
+
+ let one_us = Duration::new(0, 1000);
+
+ assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
+ assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
+
+ drop(tx);
+
+ assert_eq!(RecvError, rx.recv().unwrap_err());
+ assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
+ assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
+}
+
+#[test]
+fn concurrent_writes() {
+ for _ in 0..100 {
+ concurrent_writes_iter();
+ }
+}