summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-channel/tests/mpsc-close.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-channel/tests/mpsc-close.rs')
-rw-r--r--third_party/rust/futures-channel/tests/mpsc-close.rs299
1 files changed, 299 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/mpsc-close.rs b/third_party/rust/futures-channel/tests/mpsc-close.rs
new file mode 100644
index 0000000000..1a14067eca
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/mpsc-close.rs
@@ -0,0 +1,299 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::Future;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+use std::pin::Pin;
+use std::sync::{Arc, Weak};
+use std::thread;
+use std::time::{Duration, Instant};
+
+#[test]
+fn smoke() {
+ let (mut sender, receiver) = mpsc::channel(1);
+
+ let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
+
+ // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
+ block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
+
+ t.join().unwrap()
+}
+
+#[test]
+fn multiple_senders_disconnect() {
+ {
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+ // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+ // channel open for other senders
+ tx1.disconnect();
+ drop(tx2);
+ block_on(tx3.close()).unwrap();
+
+ assert!(tx1.is_closed());
+ assert!(tx3.is_closed());
+ assert!(!tx4.is_closed());
+
+ block_on(tx4.send(5)).unwrap();
+ assert_eq!(block_on(rx.next()), Some(5));
+
+ // dropping the final sender will close the channel
+ drop(tx4);
+ assert_eq!(block_on(rx.next()), None);
+ }
+
+ {
+ let (mut tx1, mut rx) = mpsc::unbounded();
+ let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+ // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+ // channel open for other senders
+ tx1.disconnect();
+ drop(tx2);
+ block_on(tx3.close()).unwrap();
+
+ assert!(tx1.is_closed());
+ assert!(tx3.is_closed());
+ assert!(!tx4.is_closed());
+
+ block_on(tx4.send(5)).unwrap();
+ assert_eq!(block_on(rx.next()), Some(5));
+
+ // dropping the final sender will close the channel
+ drop(tx4);
+ assert_eq!(block_on(rx.next()), None);
+ }
+}
+
+#[test]
+fn multiple_senders_close_channel() {
+ {
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let mut tx2 = tx1.clone();
+
+ // close_channel should shut down the whole channel
+ tx1.close_channel();
+
+ assert!(tx1.is_closed());
+ assert!(tx2.is_closed());
+
+ let err = block_on(tx2.send(5)).unwrap_err();
+ assert!(err.is_disconnected());
+
+ assert_eq!(block_on(rx.next()), None);
+ }
+
+ {
+ let (tx1, mut rx) = mpsc::unbounded();
+ let mut tx2 = tx1.clone();
+
+ // close_channel should shut down the whole channel
+ tx1.close_channel();
+
+ assert!(tx1.is_closed());
+ assert!(tx2.is_closed());
+
+ let err = block_on(tx2.send(5)).unwrap_err();
+ assert!(err.is_disconnected());
+
+ assert_eq!(block_on(rx.next()), None);
+ }
+}
+
+#[test]
+fn single_receiver_drop_closes_channel_and_drains() {
+ {
+ let ref_count = Arc::new(0);
+ let weak_ref = Arc::downgrade(&ref_count);
+
+ let (sender, receiver) = mpsc::unbounded();
+ sender.unbounded_send(ref_count).expect("failed to send");
+
+ // Verify that the sent message is still live.
+ assert!(weak_ref.upgrade().is_some());
+
+ drop(receiver);
+
+ // The sender should know the channel is closed.
+ assert!(sender.is_closed());
+
+ // Verify that the sent message has been dropped.
+ assert!(weak_ref.upgrade().is_none());
+ }
+
+ {
+ let ref_count = Arc::new(0);
+ let weak_ref = Arc::downgrade(&ref_count);
+
+ let (mut sender, receiver) = mpsc::channel(1);
+ sender.try_send(ref_count).expect("failed to send");
+
+ // Verify that the sent message is still live.
+ assert!(weak_ref.upgrade().is_some());
+
+ drop(receiver);
+
+ // The sender should know the channel is closed.
+ assert!(sender.is_closed());
+
+ // Verify that the sent message has been dropped.
+ assert!(weak_ref.upgrade().is_none());
+ assert!(sender.is_closed());
+ }
+}
+
+// Stress test that `try_send()`s occurring concurrently with receiver
+// close/drops don't appear as successful sends.
+#[cfg_attr(miri, ignore)] // Miri is too slow
+#[test]
+fn stress_try_send_as_receiver_closes() {
+ const AMT: usize = 10000;
+ // To provide variable timing characteristics (in the hopes of
+ // reproducing the collision that leads to a race), we busy-re-poll
+ // the test MPSC receiver a variable number of times before actually
+ // stopping. We vary this countdown between 1 and the following
+ // value.
+ const MAX_COUNTDOWN: usize = 20;
+ // When we detect that a successfully sent item is still in the
+ // queue after a disconnect, we spin for up to 100ms to confirm that
+ // it is a persistent condition and not a concurrency illusion.
+ const SPIN_TIMEOUT_S: u64 = 10;
+ const SPIN_SLEEP_MS: u64 = 10;
+ struct TestRx {
+ rx: mpsc::Receiver<Arc<()>>,
+ // The number of times to query `rx` before dropping it.
+ poll_count: usize,
+ }
+ struct TestTask {
+ command_rx: mpsc::Receiver<TestRx>,
+ test_rx: Option<mpsc::Receiver<Arc<()>>>,
+ countdown: usize,
+ }
+ impl TestTask {
+ /// Create a new TestTask
+ fn new() -> (TestTask, mpsc::Sender<TestRx>) {
+ let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
+ (
+ TestTask {
+ command_rx,
+ test_rx: None,
+ countdown: 0, // 0 means no countdown is in progress.
+ },
+ command_tx,
+ )
+ }
+ }
+ impl Future for TestTask {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // Poll the test channel, if one is present.
+ if let Some(rx) = &mut self.test_rx {
+ if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
+ let _ = v.expect("test finished unexpectedly!");
+ }
+ self.countdown -= 1;
+ // Busy-poll until the countdown is finished.
+ cx.waker().wake_by_ref();
+ }
+ // Accept any newly submitted MPSC channels for testing.
+ match self.command_rx.poll_next_unpin(cx) {
+ Poll::Ready(Some(TestRx { rx, poll_count })) => {
+ self.test_rx = Some(rx);
+ self.countdown = poll_count;
+ cx.waker().wake_by_ref();
+ }
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => {}
+ }
+ if self.countdown == 0 {
+ // Countdown complete -- drop the Receiver.
+ self.test_rx = None;
+ }
+ Poll::Pending
+ }
+ }
+ let (f, mut cmd_tx) = TestTask::new();
+ let bg = thread::spawn(move || block_on(f));
+ for i in 0..AMT {
+ let (mut test_tx, rx) = mpsc::channel(0);
+ let poll_count = i % MAX_COUNTDOWN;
+ cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
+ let mut prev_weak: Option<Weak<()>> = None;
+ let mut attempted_sends = 0;
+ let mut successful_sends = 0;
+ loop {
+ // Create a test item.
+ let item = Arc::new(());
+ let weak = Arc::downgrade(&item);
+ match test_tx.try_send(item) {
+ Ok(_) => {
+ prev_weak = Some(weak);
+ successful_sends += 1;
+ }
+ Err(ref e) if e.is_full() => {}
+ Err(ref e) if e.is_disconnected() => {
+ // Test for evidence of the race condition.
+ if let Some(prev_weak) = prev_weak {
+ if prev_weak.upgrade().is_some() {
+ // The previously sent item is still allocated.
+ // However, there appears to be some aspect of the
+ // concurrency that can legitimately cause the Arc
+ // to be momentarily valid. Spin for up to 100ms
+ // waiting for the previously sent item to be
+ // dropped.
+ let t0 = Instant::now();
+ let mut spins = 0;
+ loop {
+ if prev_weak.upgrade().is_none() {
+ break;
+ }
+ assert!(
+ t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ "item not dropped on iteration {} after \
+ {} sends ({} successful). spin=({})",
+ i,
+ attempted_sends,
+ successful_sends,
+ spins
+ );
+ spins += 1;
+ thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
+ }
+ }
+ }
+ break;
+ }
+ Err(ref e) => panic!("unexpected error: {}", e),
+ }
+ attempted_sends += 1;
+ }
+ }
+ drop(cmd_tx);
+ bg.join().expect("background thread join");
+}
+
+#[test]
+fn unbounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::unbounded::<String>();
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}
+
+#[test]
+fn bounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::channel::<String>(17);
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}