diff options
Diffstat (limited to 'third_party/rust/futures-channel/tests/mpsc.rs')
-rw-r--r-- | third_party/rust/futures-channel/tests/mpsc.rs | 634 |
1 files changed, 634 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/mpsc.rs b/third_party/rust/futures-channel/tests/mpsc.rs new file mode 100644 index 0000000000..444c8e10fd --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc.rs @@ -0,0 +1,634 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::{new_count_waker, noop_context}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + block_on(tx.send(1)).unwrap(); + drop(tx); + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1]); +} + +#[test] +fn send_recv_no_buffer() { + // Run on a task context + block_on(poll_fn(move |cx| { + let (tx, rx) = mpsc::channel::<i32>(0); + pin_mut!(tx, rx); + + assert!(tx.as_mut().poll_flush(cx).is_ready()); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send first message + assert!(tx.as_mut().start_send(1).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // poll_ready said Pending, so no room in buffer, therefore new sends + // should get rejected with is_full. + assert!(tx.as_mut().start_send(0).unwrap_err().is_full()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send second message + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().start_send(2).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + Poll::Ready(()) + })); +} + +#[test] +fn send_shared_recv() { + let (mut tx1, rx) = mpsc::channel::<i32>(16); + let mut rx = block_on_stream(rx); + let mut tx2 = tx1.clone(); + + block_on(tx1.send(1)).unwrap(); + assert_eq!(rx.next(), Some(1)); + + block_on(tx2.send(2)).unwrap(); + assert_eq!(rx.next(), Some(2)); +} + +#[test] +fn send_recv_threads() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.take(1).collect()); + assert_eq!(v, vec![1]); + + t.join().unwrap(); +} + +#[test] +fn send_recv_threads_no_capacity() { + let (mut tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + block_on(tx.send(2)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1, 2]); + + t.join().unwrap(); +} + +#[test] +fn recv_close_gets_none() { + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + rx.close(); + + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + match tx.poll_ready(cx) { + Poll::Pending | Poll::Ready(Ok(_)) => panic!(), + Poll::Ready(Err(e)) => assert!(e.is_disconnected()), + }; + + Poll::Ready(()) + })); +} + +#[test] +fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + Poll::Ready(()) + })); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); +// let rx = mpsc::spawn(stream, &core, 1); +// assert_eq!(core.run(rx.take(4).collect()).unwrap(), +// [0, 1, 2, 3]); +// } + +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a stream which never returns anything (maybe a remote end isn't +// // responding), but dropping it leads to observable side effects +// // (like closing connections, releasing limited resources, ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Stream for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let stream = Dead{done: done_tx}; +// let rx = mpsc::spawn(stream, &core, 1); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned stream: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned stream to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// _ => { +// panic!("dead stream wasn't canceled"); +// }, +// } +// } + +#[test] +fn stress_shared_unbounded() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::unbounded::<i32>(); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + tx.unbounded_send(1).unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_shared_bounded_hard() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let mut tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + block_on(tx.send(1)).unwrap(); + } + }); + } + + drop(tx); + + t.join().unwrap(); +} + +#[allow(clippy::same_item_push)] +#[test] +fn stress_receiver_multi_task_bounded_hard() { + const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; + const NTHREADS: u32 = 2; + + let (mut tx, rx) = mpsc::channel::<usize>(0); + let rx = Arc::new(Mutex::new(Some(rx))); + let n = Arc::new(AtomicUsize::new(0)); + + let mut th = vec![]; + + for _ in 0..NTHREADS { + let rx = rx.clone(); + let n = n.clone(); + + let t = thread::spawn(move || { + let mut i = 0; + + loop { + i += 1; + let mut rx_opt = rx.lock().unwrap(); + if let Some(rx) = &mut *rx_opt { + if i % 5 == 0 { + let item = block_on(rx.next()); + + if item.is_none() { + *rx_opt = None; + break; + } + + n.fetch_add(1, Ordering::Relaxed); + } else { + // Just poll + let n = n.clone(); + match rx.poll_next_unpin(&mut noop_context()) { + Poll::Ready(Some(_)) => { + n.fetch_add(1, Ordering::Relaxed); + } + Poll::Ready(None) => { + *rx_opt = None; + break; + } + Poll::Pending => {} + } + } + } else { + break; + } + } + }); + + th.push(t); + } + + for i in 0..AMT { + block_on(tx.send(i)).unwrap(); + } + drop(tx); + + for t in th { + t.join().unwrap(); + } + + assert_eq!(AMT, n.load(Ordering::Relaxed)); +} + +/// Stress test that receiver properly receives all the messages +/// after sender dropped. +#[test] +fn stress_drop_sender() { + const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; + + fn list() -> impl Stream<Item = i32> { + let (tx, rx) = mpsc::channel(1); + thread::spawn(move || { + block_on(send_one_two_three(tx)); + }); + rx + } + + for _ in 0..ITER { + let v: Vec<_> = block_on(list().collect()); + assert_eq!(v, vec![1, 2, 3]); + } +} + +async fn send_one_two_three(mut tx: mpsc::Sender<i32>) { + for i in 1..=3 { + tx.send(i).await.unwrap(); + } +} + +/// Stress test that after receiver dropped, +/// no messages are lost. +fn stress_close_receiver_iter() { + let (tx, rx) = mpsc::unbounded(); + let mut rx = block_on_stream(rx); + let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); + let th = thread::spawn(move || { + for i in 1.. { + if tx.unbounded_send(i).is_err() { + unwritten_tx.send(i).expect("unwritten_tx"); + return; + } + } + }); + + // Read one message to make sure thread effectively started + assert_eq!(Some(1), rx.next()); + + rx.close(); + + for i in 2.. { + match rx.next() { + Some(r) => assert!(i == r), + None => { + let unwritten = unwritten_rx.recv().expect("unwritten_rx"); + assert_eq!(unwritten, i); + th.join().unwrap(); + return; + } + } + } +} + +#[test] +fn stress_close_receiver() { + const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; + + for _ in 0..ITER { + stress_close_receiver_iter(); + } +} + +async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { + for i in (1..=count).rev() { + sender.send(i).await.unwrap(); + } +} + +/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. +#[allow(clippy::same_item_push)] +#[test] +fn stress_poll_ready() { + const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; + const NTHREADS: u32 = 8; + + /// Run a stress test using the specified channel capacity. + fn stress(capacity: usize) { + let (tx, rx) = mpsc::channel(capacity); + let mut threads = Vec::new(); + for _ in 0..NTHREADS { + let sender = tx.clone(); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); + } + drop(tx); + + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len() as u32, AMT * NTHREADS); + + for thread in threads { + thread.join().unwrap(); + } + } + + stress(0); + stress(1); + stress(8); + stress(16); +} + +#[test] +fn try_send_1() { + const N: usize = if cfg!(miri) { 100 } else { 3000 }; + let (mut tx, rx) = mpsc::channel(0); + + let t = thread::spawn(move || { + for i in 0..N { + loop { + if tx.try_send(i).is_ok() { + break; + } + } + } + }); + + let result: Vec<_> = block_on(rx.collect()); + for (i, j) in result.into_iter().enumerate() { + assert_eq!(i, j); + } + + t.join().unwrap(); +} + +#[test] +fn try_send_2() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + let (readytx, readyrx) = oneshot::channel::<()>(); + + let th = thread::spawn(move || { + block_on(poll_fn(|cx| { + assert!(tx.poll_ready(cx).is_pending()); + Poll::Ready(()) + })); + + drop(readytx); + block_on(tx.send("goodbye")).unwrap(); + }); + + let _ = block_on(readyrx); + assert_eq!(rx.next(), Some("hello")); + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); + + th.join().unwrap(); +} + +#[test] +fn try_send_fail() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + // This should fail + assert!(tx.try_send("fail").is_err()); + + assert_eq!(rx.next(), Some("hello")); + + tx.try_send("goodbye").unwrap(); + drop(tx); + + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); +} + +#[test] +fn try_send_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap_err(); // should be full + rx.try_next().unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty + tx.try_send("hello").unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty +} + +#[test] +fn same_receiver() { + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + assert!(txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); + assert!(!txa1.same_receiver(&txb1)); + + txa1.disconnect(); + txb1.close_channel(); + + assert!(!txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); +} + +#[test] +fn is_connected_to() { + let (txa, rxa) = mpsc::channel::<i32>(1); + let (txb, rxb) = mpsc::channel::<i32>(1); + + assert!(txa.is_connected_to(&rxa)); + assert!(txb.is_connected_to(&rxb)); + assert!(!txa.is_connected_to(&rxb)); + assert!(!txb.is_connected_to(&rxa)); +} + +#[test] +fn hash_receiver() { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert_eq!(hash_a1, hash_a2); + assert_eq!(hash_b1, hash_b2); + assert!(hash_a1 != hash_b1); + + txa1.disconnect(); + txb1.close_channel(); + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert!(hash_a1 != hash_a2); + assert_eq!(hash_b1, hash_b2); +} + +#[test] +fn send_backpressure() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx, mut rx) = mpsc::channel(1); + block_on(tx.send(1)).unwrap(); + + let mut task = tx.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} + +#[test] +fn send_backpressure_multi_senders() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + block_on(tx1.send(1)).unwrap(); + + let mut task = tx2.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} |