diff options
Diffstat (limited to 'third_party/rust/futures-channel/tests')
-rw-r--r-- | third_party/rust/futures-channel/tests/channel.rs | 66 | ||||
-rw-r--r-- | third_party/rust/futures-channel/tests/mpsc-close.rs | 299 | ||||
-rw-r--r-- | third_party/rust/futures-channel/tests/mpsc-size_hint.rs | 40 | ||||
-rw-r--r-- | third_party/rust/futures-channel/tests/mpsc.rs | 634 | ||||
-rw-r--r-- | third_party/rust/futures-channel/tests/oneshot.rs | 256 |
5 files changed, 1295 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/channel.rs b/third_party/rust/futures-channel/tests/channel.rs new file mode 100644 index 0000000000..5f01a8ef4c --- /dev/null +++ b/third_party/rust/futures-channel/tests/channel.rs @@ -0,0 +1,66 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread; + +#[test] +fn sequence() { + let (tx, rx) = mpsc::channel(1); + + let amt = 20; + let t = thread::spawn(move || block_on(send_sequence(amt, tx))); + let list: Vec<_> = block_on(rx.collect()); + let mut list = list.into_iter(); + for i in (1..=amt).rev() { + assert_eq!(list.next(), Some(i)); + } + assert_eq!(list.next(), None); + + t.join().unwrap(); +} + +async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) { + for x in 0..n { + sender.send(n - x).await.unwrap(); + } +} + +#[test] +fn drop_sender() { + let (tx, mut rx) = mpsc::channel::<u32>(1); + drop(tx); + let f = poll_fn(|cx| rx.poll_next_unpin(cx)); + assert_eq!(block_on(f), None) +} + +#[test] +fn drop_rx() { + let (mut tx, rx) = mpsc::channel::<u32>(1); + block_on(tx.send(1)).unwrap(); + drop(rx); + assert!(block_on(tx.send(1)).is_err()); +} + +#[test] +fn drop_order() { + static DROPS: AtomicUsize = AtomicUsize::new(0); + let (mut tx, rx) = mpsc::channel(1); + + struct A; + + impl Drop for A { + fn drop(&mut self) { + DROPS.fetch_add(1, Ordering::SeqCst); + } + } + + block_on(tx.send(A)).unwrap(); + assert_eq!(DROPS.load(Ordering::SeqCst), 0); + drop(rx); + assert_eq!(DROPS.load(Ordering::SeqCst), 1); + assert!(block_on(tx.send(A)).is_err()); + assert_eq!(DROPS.load(Ordering::SeqCst), 2); +} diff --git a/third_party/rust/futures-channel/tests/mpsc-close.rs b/third_party/rust/futures-channel/tests/mpsc-close.rs new file mode 100644 index 0000000000..1a14067eca --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc-close.rs @@ -0,0 +1,299 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::Future; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::thread; +use std::time::{Duration, Instant}; + +#[test] +fn smoke() { + let (mut sender, receiver) = mpsc::channel(1); + + let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); + + // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. + block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); + + t.join().unwrap() +} + +#[test] +fn multiple_senders_disconnect() { + { + let (mut tx1, mut rx) = mpsc::channel(1); + let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); + + // disconnect, dropping and Sink::poll_close should all close this sender but leave the + // channel open for other senders + tx1.disconnect(); + drop(tx2); + block_on(tx3.close()).unwrap(); + + assert!(tx1.is_closed()); + assert!(tx3.is_closed()); + assert!(!tx4.is_closed()); + + block_on(tx4.send(5)).unwrap(); + assert_eq!(block_on(rx.next()), Some(5)); + + // dropping the final sender will close the channel + drop(tx4); + assert_eq!(block_on(rx.next()), None); + } + + { + let (mut tx1, mut rx) = mpsc::unbounded(); + let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); + + // disconnect, dropping and Sink::poll_close should all close this sender but leave the + // channel open for other senders + tx1.disconnect(); + drop(tx2); + block_on(tx3.close()).unwrap(); + + assert!(tx1.is_closed()); + assert!(tx3.is_closed()); + assert!(!tx4.is_closed()); + + block_on(tx4.send(5)).unwrap(); + assert_eq!(block_on(rx.next()), Some(5)); + + // dropping the final sender will close the channel + drop(tx4); + assert_eq!(block_on(rx.next()), None); + } +} + +#[test] +fn multiple_senders_close_channel() { + { + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + + // close_channel should shut down the whole channel + tx1.close_channel(); + + assert!(tx1.is_closed()); + assert!(tx2.is_closed()); + + let err = block_on(tx2.send(5)).unwrap_err(); + assert!(err.is_disconnected()); + + assert_eq!(block_on(rx.next()), None); + } + + { + let (tx1, mut rx) = mpsc::unbounded(); + let mut tx2 = tx1.clone(); + + // close_channel should shut down the whole channel + tx1.close_channel(); + + assert!(tx1.is_closed()); + assert!(tx2.is_closed()); + + let err = block_on(tx2.send(5)).unwrap_err(); + assert!(err.is_disconnected()); + + assert_eq!(block_on(rx.next()), None); + } +} + +#[test] +fn single_receiver_drop_closes_channel_and_drains() { + { + let ref_count = Arc::new(0); + let weak_ref = Arc::downgrade(&ref_count); + + let (sender, receiver) = mpsc::unbounded(); + sender.unbounded_send(ref_count).expect("failed to send"); + + // Verify that the sent message is still live. + assert!(weak_ref.upgrade().is_some()); + + drop(receiver); + + // The sender should know the channel is closed. + assert!(sender.is_closed()); + + // Verify that the sent message has been dropped. + assert!(weak_ref.upgrade().is_none()); + } + + { + let ref_count = Arc::new(0); + let weak_ref = Arc::downgrade(&ref_count); + + let (mut sender, receiver) = mpsc::channel(1); + sender.try_send(ref_count).expect("failed to send"); + + // Verify that the sent message is still live. + assert!(weak_ref.upgrade().is_some()); + + drop(receiver); + + // The sender should know the channel is closed. + assert!(sender.is_closed()); + + // Verify that the sent message has been dropped. + assert!(weak_ref.upgrade().is_none()); + assert!(sender.is_closed()); + } +} + +// Stress test that `try_send()`s occurring concurrently with receiver +// close/drops don't appear as successful sends. +#[cfg_attr(miri, ignore)] // Miri is too slow +#[test] +fn stress_try_send_as_receiver_closes() { + const AMT: usize = 10000; + // To provide variable timing characteristics (in the hopes of + // reproducing the collision that leads to a race), we busy-re-poll + // the test MPSC receiver a variable number of times before actually + // stopping. We vary this countdown between 1 and the following + // value. + const MAX_COUNTDOWN: usize = 20; + // When we detect that a successfully sent item is still in the + // queue after a disconnect, we spin for up to 100ms to confirm that + // it is a persistent condition and not a concurrency illusion. + const SPIN_TIMEOUT_S: u64 = 10; + const SPIN_SLEEP_MS: u64 = 10; + struct TestRx { + rx: mpsc::Receiver<Arc<()>>, + // The number of times to query `rx` before dropping it. + poll_count: usize, + } + struct TestTask { + command_rx: mpsc::Receiver<TestRx>, + test_rx: Option<mpsc::Receiver<Arc<()>>>, + countdown: usize, + } + impl TestTask { + /// Create a new TestTask + fn new() -> (TestTask, mpsc::Sender<TestRx>) { + let (command_tx, command_rx) = mpsc::channel::<TestRx>(0); + ( + TestTask { + command_rx, + test_rx: None, + countdown: 0, // 0 means no countdown is in progress. + }, + command_tx, + ) + } + } + impl Future for TestTask { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + // Poll the test channel, if one is present. + if let Some(rx) = &mut self.test_rx { + if let Poll::Ready(v) = rx.poll_next_unpin(cx) { + let _ = v.expect("test finished unexpectedly!"); + } + self.countdown -= 1; + // Busy-poll until the countdown is finished. + cx.waker().wake_by_ref(); + } + // Accept any newly submitted MPSC channels for testing. + match self.command_rx.poll_next_unpin(cx) { + Poll::Ready(Some(TestRx { rx, poll_count })) => { + self.test_rx = Some(rx); + self.countdown = poll_count; + cx.waker().wake_by_ref(); + } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => {} + } + if self.countdown == 0 { + // Countdown complete -- drop the Receiver. + self.test_rx = None; + } + Poll::Pending + } + } + let (f, mut cmd_tx) = TestTask::new(); + let bg = thread::spawn(move || block_on(f)); + for i in 0..AMT { + let (mut test_tx, rx) = mpsc::channel(0); + let poll_count = i % MAX_COUNTDOWN; + cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); + let mut prev_weak: Option<Weak<()>> = None; + let mut attempted_sends = 0; + let mut successful_sends = 0; + loop { + // Create a test item. + let item = Arc::new(()); + let weak = Arc::downgrade(&item); + match test_tx.try_send(item) { + Ok(_) => { + prev_weak = Some(weak); + successful_sends += 1; + } + Err(ref e) if e.is_full() => {} + Err(ref e) if e.is_disconnected() => { + // Test for evidence of the race condition. + if let Some(prev_weak) = prev_weak { + if prev_weak.upgrade().is_some() { + // The previously sent item is still allocated. + // However, there appears to be some aspect of the + // concurrency that can legitimately cause the Arc + // to be momentarily valid. Spin for up to 100ms + // waiting for the previously sent item to be + // dropped. + let t0 = Instant::now(); + let mut spins = 0; + loop { + if prev_weak.upgrade().is_none() { + break; + } + assert!( + t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), + "item not dropped on iteration {} after \ + {} sends ({} successful). spin=({})", + i, + attempted_sends, + successful_sends, + spins + ); + spins += 1; + thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); + } + } + } + break; + } + Err(ref e) => panic!("unexpected error: {}", e), + } + attempted_sends += 1; + } + } + drop(cmd_tx); + bg.join().expect("background thread join"); +} + +#[test] +fn unbounded_try_next_after_none() { + let (tx, mut rx) = mpsc::unbounded::<String>(); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} + +#[test] +fn bounded_try_next_after_none() { + let (tx, mut rx) = mpsc::channel::<String>(17); + // Drop the sender, close the channel. + drop(tx); + // Receive the end of channel. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); + // None received, check we can call `try_next` again. + assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); +} diff --git a/third_party/rust/futures-channel/tests/mpsc-size_hint.rs b/third_party/rust/futures-channel/tests/mpsc-size_hint.rs new file mode 100644 index 0000000000..d9cdaa31fa --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc-size_hint.rs @@ -0,0 +1,40 @@ +use futures::channel::mpsc; +use futures::stream::Stream; + +#[test] +fn unbounded_size_hint() { + let (tx, mut rx) = mpsc::unbounded::<u32>(); + assert_eq!((0, None), rx.size_hint()); + tx.unbounded_send(1).unwrap(); + assert_eq!((1, None), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((0, None), rx.size_hint()); + tx.unbounded_send(2).unwrap(); + tx.unbounded_send(3).unwrap(); + assert_eq!((2, None), rx.size_hint()); + drop(tx); + assert_eq!((2, Some(2)), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((1, Some(1)), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((0, Some(0)), rx.size_hint()); +} + +#[test] +fn channel_size_hint() { + let (mut tx, mut rx) = mpsc::channel::<u32>(10); + assert_eq!((0, None), rx.size_hint()); + tx.try_send(1).unwrap(); + assert_eq!((1, None), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((0, None), rx.size_hint()); + tx.try_send(2).unwrap(); + tx.try_send(3).unwrap(); + assert_eq!((2, None), rx.size_hint()); + drop(tx); + assert_eq!((2, Some(2)), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((1, Some(1)), rx.size_hint()); + rx.try_next().unwrap().unwrap(); + assert_eq!((0, Some(0)), rx.size_hint()); +} diff --git a/third_party/rust/futures-channel/tests/mpsc.rs b/third_party/rust/futures-channel/tests/mpsc.rs new file mode 100644 index 0000000000..444c8e10fd --- /dev/null +++ b/third_party/rust/futures-channel/tests/mpsc.rs @@ -0,0 +1,634 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{poll_fn, FutureExt}; +use futures::pin_mut; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{Stream, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::{new_count_waker, noop_context}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + block_on(tx.send(1)).unwrap(); + drop(tx); + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1]); +} + +#[test] +fn send_recv_no_buffer() { + // Run on a task context + block_on(poll_fn(move |cx| { + let (tx, rx) = mpsc::channel::<i32>(0); + pin_mut!(tx, rx); + + assert!(tx.as_mut().poll_flush(cx).is_ready()); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send first message + assert!(tx.as_mut().start_send(1).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // poll_ready said Pending, so no room in buffer, therefore new sends + // should get rejected with is_full. + assert!(tx.as_mut().start_send(0).unwrap_err().is_full()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send second message + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().start_send(2).is_ok()); + assert!(tx.as_mut().poll_ready(cx).is_pending()); + + // Take the value + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + Poll::Ready(()) + })); +} + +#[test] +fn send_shared_recv() { + let (mut tx1, rx) = mpsc::channel::<i32>(16); + let mut rx = block_on_stream(rx); + let mut tx2 = tx1.clone(); + + block_on(tx1.send(1)).unwrap(); + assert_eq!(rx.next(), Some(1)); + + block_on(tx2.send(2)).unwrap(); + assert_eq!(rx.next(), Some(2)); +} + +#[test] +fn send_recv_threads() { + let (mut tx, rx) = mpsc::channel::<i32>(16); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.take(1).collect()); + assert_eq!(v, vec![1]); + + t.join().unwrap(); +} + +#[test] +fn send_recv_threads_no_capacity() { + let (mut tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + block_on(tx.send(1)).unwrap(); + block_on(tx.send(2)).unwrap(); + }); + + let v: Vec<_> = block_on(rx.collect()); + assert_eq!(v, vec![1, 2]); + + t.join().unwrap(); +} + +#[test] +fn recv_close_gets_none() { + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + rx.close(); + + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + match tx.poll_ready(cx) { + Poll::Pending | Poll::Ready(Ok(_)) => panic!(), + Poll::Ready(Err(e)) => assert!(e.is_disconnected()), + }; + + Poll::Ready(()) + })); +} + +#[test] +fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + block_on(poll_fn(move |cx| { + assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); + Poll::Ready(()) + })); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); +// let rx = mpsc::spawn(stream, &core, 1); +// assert_eq!(core.run(rx.take(4).collect()).unwrap(), +// [0, 1, 2, 3]); +// } + +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a stream which never returns anything (maybe a remote end isn't +// // responding), but dropping it leads to observable side effects +// // (like closing connections, releasing limited resources, ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Stream for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let stream = Dead{done: done_tx}; +// let rx = mpsc::spawn(stream, &core, 1); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned stream: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned stream to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// _ => { +// panic!("dead stream wasn't canceled"); +// }, +// } +// } + +#[test] +fn stress_shared_unbounded() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::unbounded::<i32>(); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + tx.unbounded_send(1).unwrap(); + } + }); + } + + drop(tx); + + t.join().ok().unwrap(); +} + +#[test] +fn stress_shared_bounded_hard() { + const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; + const NTHREADS: u32 = 8; + let (tx, rx) = mpsc::channel::<i32>(0); + + let t = thread::spawn(move || { + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len(), (AMT * NTHREADS) as usize); + for item in result { + assert_eq!(item, 1); + } + }); + + for _ in 0..NTHREADS { + let mut tx = tx.clone(); + + thread::spawn(move || { + for _ in 0..AMT { + block_on(tx.send(1)).unwrap(); + } + }); + } + + drop(tx); + + t.join().unwrap(); +} + +#[allow(clippy::same_item_push)] +#[test] +fn stress_receiver_multi_task_bounded_hard() { + const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; + const NTHREADS: u32 = 2; + + let (mut tx, rx) = mpsc::channel::<usize>(0); + let rx = Arc::new(Mutex::new(Some(rx))); + let n = Arc::new(AtomicUsize::new(0)); + + let mut th = vec![]; + + for _ in 0..NTHREADS { + let rx = rx.clone(); + let n = n.clone(); + + let t = thread::spawn(move || { + let mut i = 0; + + loop { + i += 1; + let mut rx_opt = rx.lock().unwrap(); + if let Some(rx) = &mut *rx_opt { + if i % 5 == 0 { + let item = block_on(rx.next()); + + if item.is_none() { + *rx_opt = None; + break; + } + + n.fetch_add(1, Ordering::Relaxed); + } else { + // Just poll + let n = n.clone(); + match rx.poll_next_unpin(&mut noop_context()) { + Poll::Ready(Some(_)) => { + n.fetch_add(1, Ordering::Relaxed); + } + Poll::Ready(None) => { + *rx_opt = None; + break; + } + Poll::Pending => {} + } + } + } else { + break; + } + } + }); + + th.push(t); + } + + for i in 0..AMT { + block_on(tx.send(i)).unwrap(); + } + drop(tx); + + for t in th { + t.join().unwrap(); + } + + assert_eq!(AMT, n.load(Ordering::Relaxed)); +} + +/// Stress test that receiver properly receives all the messages +/// after sender dropped. +#[test] +fn stress_drop_sender() { + const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; + + fn list() -> impl Stream<Item = i32> { + let (tx, rx) = mpsc::channel(1); + thread::spawn(move || { + block_on(send_one_two_three(tx)); + }); + rx + } + + for _ in 0..ITER { + let v: Vec<_> = block_on(list().collect()); + assert_eq!(v, vec![1, 2, 3]); + } +} + +async fn send_one_two_three(mut tx: mpsc::Sender<i32>) { + for i in 1..=3 { + tx.send(i).await.unwrap(); + } +} + +/// Stress test that after receiver dropped, +/// no messages are lost. +fn stress_close_receiver_iter() { + let (tx, rx) = mpsc::unbounded(); + let mut rx = block_on_stream(rx); + let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); + let th = thread::spawn(move || { + for i in 1.. { + if tx.unbounded_send(i).is_err() { + unwritten_tx.send(i).expect("unwritten_tx"); + return; + } + } + }); + + // Read one message to make sure thread effectively started + assert_eq!(Some(1), rx.next()); + + rx.close(); + + for i in 2.. { + match rx.next() { + Some(r) => assert!(i == r), + None => { + let unwritten = unwritten_rx.recv().expect("unwritten_rx"); + assert_eq!(unwritten, i); + th.join().unwrap(); + return; + } + } + } +} + +#[test] +fn stress_close_receiver() { + const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; + + for _ in 0..ITER { + stress_close_receiver_iter(); + } +} + +async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { + for i in (1..=count).rev() { + sender.send(i).await.unwrap(); + } +} + +/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. +#[allow(clippy::same_item_push)] +#[test] +fn stress_poll_ready() { + const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; + const NTHREADS: u32 = 8; + + /// Run a stress test using the specified channel capacity. + fn stress(capacity: usize) { + let (tx, rx) = mpsc::channel(capacity); + let mut threads = Vec::new(); + for _ in 0..NTHREADS { + let sender = tx.clone(); + threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); + } + drop(tx); + + let result: Vec<_> = block_on(rx.collect()); + assert_eq!(result.len() as u32, AMT * NTHREADS); + + for thread in threads { + thread.join().unwrap(); + } + } + + stress(0); + stress(1); + stress(8); + stress(16); +} + +#[test] +fn try_send_1() { + const N: usize = if cfg!(miri) { 100 } else { 3000 }; + let (mut tx, rx) = mpsc::channel(0); + + let t = thread::spawn(move || { + for i in 0..N { + loop { + if tx.try_send(i).is_ok() { + break; + } + } + } + }); + + let result: Vec<_> = block_on(rx.collect()); + for (i, j) in result.into_iter().enumerate() { + assert_eq!(i, j); + } + + t.join().unwrap(); +} + +#[test] +fn try_send_2() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + let (readytx, readyrx) = oneshot::channel::<()>(); + + let th = thread::spawn(move || { + block_on(poll_fn(|cx| { + assert!(tx.poll_ready(cx).is_pending()); + Poll::Ready(()) + })); + + drop(readytx); + block_on(tx.send("goodbye")).unwrap(); + }); + + let _ = block_on(readyrx); + assert_eq!(rx.next(), Some("hello")); + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); + + th.join().unwrap(); +} + +#[test] +fn try_send_fail() { + let (mut tx, rx) = mpsc::channel(0); + let mut rx = block_on_stream(rx); + + tx.try_send("hello").unwrap(); + + // This should fail + assert!(tx.try_send("fail").is_err()); + + assert_eq!(rx.next(), Some("hello")); + + tx.try_send("goodbye").unwrap(); + drop(tx); + + assert_eq!(rx.next(), Some("goodbye")); + assert_eq!(rx.next(), None); +} + +#[test] +fn try_send_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap_err(); // should be full + rx.try_next().unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty + tx.try_send("hello").unwrap(); + rx.try_next().unwrap(); + rx.try_next().unwrap_err(); // should be empty +} + +#[test] +fn same_receiver() { + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + assert!(txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); + assert!(!txa1.same_receiver(&txb1)); + + txa1.disconnect(); + txb1.close_channel(); + + assert!(!txa1.same_receiver(&txa2)); + assert!(txb1.same_receiver(&txb2)); +} + +#[test] +fn is_connected_to() { + let (txa, rxa) = mpsc::channel::<i32>(1); + let (txb, rxb) = mpsc::channel::<i32>(1); + + assert!(txa.is_connected_to(&rxa)); + assert!(txb.is_connected_to(&rxb)); + assert!(!txa.is_connected_to(&rxb)); + assert!(!txb.is_connected_to(&rxa)); +} + +#[test] +fn hash_receiver() { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + let (mut txa1, _) = mpsc::channel::<i32>(1); + let txa2 = txa1.clone(); + + let (mut txb1, _) = mpsc::channel::<i32>(1); + let txb2 = txb1.clone(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert_eq!(hash_a1, hash_a2); + assert_eq!(hash_b1, hash_b2); + assert!(hash_a1 != hash_b1); + + txa1.disconnect(); + txb1.close_channel(); + + let mut hasher_a1 = DefaultHasher::new(); + let mut hasher_a2 = DefaultHasher::new(); + let mut hasher_b1 = DefaultHasher::new(); + let mut hasher_b2 = DefaultHasher::new(); + + txa1.hash_receiver(&mut hasher_a1); + let hash_a1 = hasher_a1.finish(); + txa2.hash_receiver(&mut hasher_a2); + let hash_a2 = hasher_a2.finish(); + txb1.hash_receiver(&mut hasher_b1); + let hash_b1 = hasher_b1.finish(); + txb2.hash_receiver(&mut hasher_b2); + let hash_b2 = hasher_b2.finish(); + + assert!(hash_a1 != hash_a2); + assert_eq!(hash_b1, hash_b2); +} + +#[test] +fn send_backpressure() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx, mut rx) = mpsc::channel(1); + block_on(tx.send(1)).unwrap(); + + let mut task = tx.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} + +#[test] +fn send_backpressure_multi_senders() { + let (waker, counter) = new_count_waker(); + let mut cx = Context::from_waker(&waker); + + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + block_on(tx1.send(1)).unwrap(); + + let mut task = tx2.send(2); + assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); + assert_eq!(counter, 0); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 1); + assert_eq!(counter, 1); + assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); + + let item = block_on(rx.next()).unwrap(); + assert_eq!(item, 2); +} diff --git a/third_party/rust/futures-channel/tests/oneshot.rs b/third_party/rust/futures-channel/tests/oneshot.rs new file mode 100644 index 0000000000..6b48376dc0 --- /dev/null +++ b/third_party/rust/futures-channel/tests/oneshot.rs @@ -0,0 +1,256 @@ +use futures::channel::oneshot::{self, Sender}; +use futures::executor::block_on; +use futures::future::{poll_fn, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::panic_waker_ref; +use std::sync::mpsc; +use std::thread; + +#[test] +fn smoke_poll() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut rx = Some(rx); + let f = poll_fn(|cx| { + assert!(tx.poll_canceled(cx).is_pending()); + assert!(tx.poll_canceled(cx).is_pending()); + drop(rx.take()); + assert!(tx.poll_canceled(cx).is_ready()); + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + }); + + block_on(f); +} + +#[test] +fn cancel_notifies() { + let (mut tx, rx) = oneshot::channel::<u32>(); + + let t = thread::spawn(move || { + block_on(tx.cancellation()); + }); + drop(rx); + t.join().unwrap(); +} + +#[test] +fn cancel_lots() { + const N: usize = if cfg!(miri) { 100 } else { 20000 }; + + let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>(); + let t = thread::spawn(move || { + for (mut tx, tx2) in rx { + block_on(tx.cancellation()); + tx2.send(()).unwrap(); + } + }); + + for _ in 0..N { + let (otx, orx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + tx.send((otx, tx2)).unwrap(); + drop(orx); + rx2.recv().unwrap(); + } + drop(tx); + + t.join().unwrap(); +} + +#[test] +fn cancel_after_sender_drop_doesnt_notify() { + let (mut tx, rx) = oneshot::channel::<u32>(); + let mut cx = Context::from_waker(panic_waker_ref()); + assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending); + drop(tx); + drop(rx); +} + +#[test] +fn close() { + let (mut tx, mut rx) = oneshot::channel::<u32>(); + rx.close(); + block_on(poll_fn(|cx| { + match rx.poll_unpin(cx) { + Poll::Ready(Err(_)) => {} + _ => panic!(), + }; + assert!(tx.poll_canceled(cx).is_ready()); + Poll::Ready(()) + })); +} + +#[test] +fn close_wakes() { + let (mut tx, mut rx) = oneshot::channel::<u32>(); + let (tx2, rx2) = mpsc::channel(); + let t = thread::spawn(move || { + rx.close(); + rx2.recv().unwrap(); + }); + block_on(tx.cancellation()); + tx2.send(()).unwrap(); + t.join().unwrap(); +} + +#[test] +fn is_canceled() { + let (tx, rx) = oneshot::channel::<u32>(); + assert!(!tx.is_canceled()); + drop(rx); + assert!(tx.is_canceled()); +} + +#[test] +fn cancel_sends() { + const N: usize = if cfg!(miri) { 100 } else { 20000 }; + + let (tx, rx) = mpsc::channel::<Sender<_>>(); + let t = thread::spawn(move || { + for otx in rx { + let _ = otx.send(42); + } + }); + + for _ in 0..N { + let (otx, mut orx) = oneshot::channel::<u32>(); + tx.send(otx).unwrap(); + + orx.close(); + let _ = block_on(orx); + } + + drop(tx); + t.join().unwrap(); +} + +// #[test] +// fn spawn_sends_items() { +// let core = local_executor::Core::new(); +// let future = ok::<_, ()>(1); +// let rx = spawn(future, &core); +// assert_eq!(core.run(rx).unwrap(), 1); +// } +// +// #[test] +// fn spawn_kill_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // now drop the spawned future: maybe some timeout exceeded, +// // or some connection on this end was closed by the remote +// // end. +// drop(rx); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => (), +// Ok(Either::B(((), _))) => { +// panic!("dead future wasn't canceled (timeout)"); +// }, +// _ => { +// panic!("dead future wasn't canceled (unexpected result)"); +// }, +// } +// } +// +// #[test] +// fn spawn_dont_kill_forgot_dead_stream() { +// use std::thread; +// use std::time::Duration; +// use futures::future::Either; +// use futures::sync::oneshot; +// +// // a future which never returns anything (forever accepting incoming +// // connections), but dropping it leads to observable side effects +// // (like closing listening sockets, releasing limited resources, +// // ...) +// #[derive(Debug)] +// struct Dead { +// // when dropped you should get Err(oneshot::Canceled) on the +// // receiving end +// done: oneshot::Sender<()>, +// } +// impl Future for Dead { +// type Item = (); +// type Error = (); +// +// fn poll(&mut self) -> Poll<Self::Item, Self::Error> { +// Ok(Poll::Pending) +// } +// } +// +// // need to implement a timeout for the test, as it would hang +// // forever right now +// let (timeout_tx, timeout_rx) = oneshot::channel(); +// thread::spawn(move || { +// thread::sleep(Duration::from_millis(1000)); +// let _ = timeout_tx.send(()); +// }); +// +// let core = local_executor::Core::new(); +// let (done_tx, done_rx) = oneshot::channel(); +// let future = Dead{done: done_tx}; +// let rx = spawn(future, &core); +// let res = core.run( +// Ok::<_, ()>(()) +// .into_future() +// .then(move |_| { +// // forget the spawned future: should keep running, i.e. hit +// // the timeout below. +// rx.forget(); +// // and wait for the spawned future to release its resources +// done_rx +// }) +// .select2(timeout_rx) +// ); +// match res { +// Err(Either::A((oneshot::Canceled, _))) => { +// panic!("forgotten dead future was canceled"); +// }, +// Ok(Either::B(((), _))) => (), // reached timeout +// _ => { +// panic!("forgotten dead future was canceled (unexpected result)"); +// }, +// } +// } |