summaryrefslogtreecommitdiffstats
path: root/third_party/rust/futures-channel/tests
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/futures-channel/tests')
-rw-r--r--third_party/rust/futures-channel/tests/channel.rs66
-rw-r--r--third_party/rust/futures-channel/tests/mpsc-close.rs299
-rw-r--r--third_party/rust/futures-channel/tests/mpsc-size_hint.rs40
-rw-r--r--third_party/rust/futures-channel/tests/mpsc.rs634
-rw-r--r--third_party/rust/futures-channel/tests/oneshot.rs256
5 files changed, 1295 insertions, 0 deletions
diff --git a/third_party/rust/futures-channel/tests/channel.rs b/third_party/rust/futures-channel/tests/channel.rs
new file mode 100644
index 0000000000..5f01a8ef4c
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/channel.rs
@@ -0,0 +1,66 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::poll_fn;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::thread;
+
+#[test]
+fn sequence() {
+ let (tx, rx) = mpsc::channel(1);
+
+ let amt = 20;
+ let t = thread::spawn(move || block_on(send_sequence(amt, tx)));
+ let list: Vec<_> = block_on(rx.collect());
+ let mut list = list.into_iter();
+ for i in (1..=amt).rev() {
+ assert_eq!(list.next(), Some(i));
+ }
+ assert_eq!(list.next(), None);
+
+ t.join().unwrap();
+}
+
+async fn send_sequence(n: u32, mut sender: mpsc::Sender<u32>) {
+ for x in 0..n {
+ sender.send(n - x).await.unwrap();
+ }
+}
+
+#[test]
+fn drop_sender() {
+ let (tx, mut rx) = mpsc::channel::<u32>(1);
+ drop(tx);
+ let f = poll_fn(|cx| rx.poll_next_unpin(cx));
+ assert_eq!(block_on(f), None)
+}
+
+#[test]
+fn drop_rx() {
+ let (mut tx, rx) = mpsc::channel::<u32>(1);
+ block_on(tx.send(1)).unwrap();
+ drop(rx);
+ assert!(block_on(tx.send(1)).is_err());
+}
+
+#[test]
+fn drop_order() {
+ static DROPS: AtomicUsize = AtomicUsize::new(0);
+ let (mut tx, rx) = mpsc::channel(1);
+
+ struct A;
+
+ impl Drop for A {
+ fn drop(&mut self) {
+ DROPS.fetch_add(1, Ordering::SeqCst);
+ }
+ }
+
+ block_on(tx.send(A)).unwrap();
+ assert_eq!(DROPS.load(Ordering::SeqCst), 0);
+ drop(rx);
+ assert_eq!(DROPS.load(Ordering::SeqCst), 1);
+ assert!(block_on(tx.send(A)).is_err());
+ assert_eq!(DROPS.load(Ordering::SeqCst), 2);
+}
diff --git a/third_party/rust/futures-channel/tests/mpsc-close.rs b/third_party/rust/futures-channel/tests/mpsc-close.rs
new file mode 100644
index 0000000000..1a14067eca
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/mpsc-close.rs
@@ -0,0 +1,299 @@
+use futures::channel::mpsc;
+use futures::executor::block_on;
+use futures::future::Future;
+use futures::sink::SinkExt;
+use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
+use std::pin::Pin;
+use std::sync::{Arc, Weak};
+use std::thread;
+use std::time::{Duration, Instant};
+
+#[test]
+fn smoke() {
+ let (mut sender, receiver) = mpsc::channel(1);
+
+ let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
+
+ // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
+ block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
+
+ t.join().unwrap()
+}
+
+#[test]
+fn multiple_senders_disconnect() {
+ {
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+ // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+ // channel open for other senders
+ tx1.disconnect();
+ drop(tx2);
+ block_on(tx3.close()).unwrap();
+
+ assert!(tx1.is_closed());
+ assert!(tx3.is_closed());
+ assert!(!tx4.is_closed());
+
+ block_on(tx4.send(5)).unwrap();
+ assert_eq!(block_on(rx.next()), Some(5));
+
+ // dropping the final sender will close the channel
+ drop(tx4);
+ assert_eq!(block_on(rx.next()), None);
+ }
+
+ {
+ let (mut tx1, mut rx) = mpsc::unbounded();
+ let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
+
+ // disconnect, dropping and Sink::poll_close should all close this sender but leave the
+ // channel open for other senders
+ tx1.disconnect();
+ drop(tx2);
+ block_on(tx3.close()).unwrap();
+
+ assert!(tx1.is_closed());
+ assert!(tx3.is_closed());
+ assert!(!tx4.is_closed());
+
+ block_on(tx4.send(5)).unwrap();
+ assert_eq!(block_on(rx.next()), Some(5));
+
+ // dropping the final sender will close the channel
+ drop(tx4);
+ assert_eq!(block_on(rx.next()), None);
+ }
+}
+
+#[test]
+fn multiple_senders_close_channel() {
+ {
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let mut tx2 = tx1.clone();
+
+ // close_channel should shut down the whole channel
+ tx1.close_channel();
+
+ assert!(tx1.is_closed());
+ assert!(tx2.is_closed());
+
+ let err = block_on(tx2.send(5)).unwrap_err();
+ assert!(err.is_disconnected());
+
+ assert_eq!(block_on(rx.next()), None);
+ }
+
+ {
+ let (tx1, mut rx) = mpsc::unbounded();
+ let mut tx2 = tx1.clone();
+
+ // close_channel should shut down the whole channel
+ tx1.close_channel();
+
+ assert!(tx1.is_closed());
+ assert!(tx2.is_closed());
+
+ let err = block_on(tx2.send(5)).unwrap_err();
+ assert!(err.is_disconnected());
+
+ assert_eq!(block_on(rx.next()), None);
+ }
+}
+
+#[test]
+fn single_receiver_drop_closes_channel_and_drains() {
+ {
+ let ref_count = Arc::new(0);
+ let weak_ref = Arc::downgrade(&ref_count);
+
+ let (sender, receiver) = mpsc::unbounded();
+ sender.unbounded_send(ref_count).expect("failed to send");
+
+ // Verify that the sent message is still live.
+ assert!(weak_ref.upgrade().is_some());
+
+ drop(receiver);
+
+ // The sender should know the channel is closed.
+ assert!(sender.is_closed());
+
+ // Verify that the sent message has been dropped.
+ assert!(weak_ref.upgrade().is_none());
+ }
+
+ {
+ let ref_count = Arc::new(0);
+ let weak_ref = Arc::downgrade(&ref_count);
+
+ let (mut sender, receiver) = mpsc::channel(1);
+ sender.try_send(ref_count).expect("failed to send");
+
+ // Verify that the sent message is still live.
+ assert!(weak_ref.upgrade().is_some());
+
+ drop(receiver);
+
+ // The sender should know the channel is closed.
+ assert!(sender.is_closed());
+
+ // Verify that the sent message has been dropped.
+ assert!(weak_ref.upgrade().is_none());
+ assert!(sender.is_closed());
+ }
+}
+
+// Stress test that `try_send()`s occurring concurrently with receiver
+// close/drops don't appear as successful sends.
+#[cfg_attr(miri, ignore)] // Miri is too slow
+#[test]
+fn stress_try_send_as_receiver_closes() {
+ const AMT: usize = 10000;
+ // To provide variable timing characteristics (in the hopes of
+ // reproducing the collision that leads to a race), we busy-re-poll
+ // the test MPSC receiver a variable number of times before actually
+ // stopping. We vary this countdown between 1 and the following
+ // value.
+ const MAX_COUNTDOWN: usize = 20;
+ // When we detect that a successfully sent item is still in the
+ // queue after a disconnect, we spin for up to 100ms to confirm that
+ // it is a persistent condition and not a concurrency illusion.
+ const SPIN_TIMEOUT_S: u64 = 10;
+ const SPIN_SLEEP_MS: u64 = 10;
+ struct TestRx {
+ rx: mpsc::Receiver<Arc<()>>,
+ // The number of times to query `rx` before dropping it.
+ poll_count: usize,
+ }
+ struct TestTask {
+ command_rx: mpsc::Receiver<TestRx>,
+ test_rx: Option<mpsc::Receiver<Arc<()>>>,
+ countdown: usize,
+ }
+ impl TestTask {
+ /// Create a new TestTask
+ fn new() -> (TestTask, mpsc::Sender<TestRx>) {
+ let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
+ (
+ TestTask {
+ command_rx,
+ test_rx: None,
+ countdown: 0, // 0 means no countdown is in progress.
+ },
+ command_tx,
+ )
+ }
+ }
+ impl Future for TestTask {
+ type Output = ();
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ // Poll the test channel, if one is present.
+ if let Some(rx) = &mut self.test_rx {
+ if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
+ let _ = v.expect("test finished unexpectedly!");
+ }
+ self.countdown -= 1;
+ // Busy-poll until the countdown is finished.
+ cx.waker().wake_by_ref();
+ }
+ // Accept any newly submitted MPSC channels for testing.
+ match self.command_rx.poll_next_unpin(cx) {
+ Poll::Ready(Some(TestRx { rx, poll_count })) => {
+ self.test_rx = Some(rx);
+ self.countdown = poll_count;
+ cx.waker().wake_by_ref();
+ }
+ Poll::Ready(None) => return Poll::Ready(()),
+ Poll::Pending => {}
+ }
+ if self.countdown == 0 {
+ // Countdown complete -- drop the Receiver.
+ self.test_rx = None;
+ }
+ Poll::Pending
+ }
+ }
+ let (f, mut cmd_tx) = TestTask::new();
+ let bg = thread::spawn(move || block_on(f));
+ for i in 0..AMT {
+ let (mut test_tx, rx) = mpsc::channel(0);
+ let poll_count = i % MAX_COUNTDOWN;
+ cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
+ let mut prev_weak: Option<Weak<()>> = None;
+ let mut attempted_sends = 0;
+ let mut successful_sends = 0;
+ loop {
+ // Create a test item.
+ let item = Arc::new(());
+ let weak = Arc::downgrade(&item);
+ match test_tx.try_send(item) {
+ Ok(_) => {
+ prev_weak = Some(weak);
+ successful_sends += 1;
+ }
+ Err(ref e) if e.is_full() => {}
+ Err(ref e) if e.is_disconnected() => {
+ // Test for evidence of the race condition.
+ if let Some(prev_weak) = prev_weak {
+ if prev_weak.upgrade().is_some() {
+ // The previously sent item is still allocated.
+ // However, there appears to be some aspect of the
+ // concurrency that can legitimately cause the Arc
+ // to be momentarily valid. Spin for up to 100ms
+ // waiting for the previously sent item to be
+ // dropped.
+ let t0 = Instant::now();
+ let mut spins = 0;
+ loop {
+ if prev_weak.upgrade().is_none() {
+ break;
+ }
+ assert!(
+ t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
+ "item not dropped on iteration {} after \
+ {} sends ({} successful). spin=({})",
+ i,
+ attempted_sends,
+ successful_sends,
+ spins
+ );
+ spins += 1;
+ thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
+ }
+ }
+ }
+ break;
+ }
+ Err(ref e) => panic!("unexpected error: {}", e),
+ }
+ attempted_sends += 1;
+ }
+ }
+ drop(cmd_tx);
+ bg.join().expect("background thread join");
+}
+
+#[test]
+fn unbounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::unbounded::<String>();
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}
+
+#[test]
+fn bounded_try_next_after_none() {
+ let (tx, mut rx) = mpsc::channel::<String>(17);
+ // Drop the sender, close the channel.
+ drop(tx);
+ // Receive the end of channel.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+ // None received, check we can call `try_next` again.
+ assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
+}
diff --git a/third_party/rust/futures-channel/tests/mpsc-size_hint.rs b/third_party/rust/futures-channel/tests/mpsc-size_hint.rs
new file mode 100644
index 0000000000..d9cdaa31fa
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/mpsc-size_hint.rs
@@ -0,0 +1,40 @@
+use futures::channel::mpsc;
+use futures::stream::Stream;
+
+#[test]
+fn unbounded_size_hint() {
+ let (tx, mut rx) = mpsc::unbounded::<u32>();
+ assert_eq!((0, None), rx.size_hint());
+ tx.unbounded_send(1).unwrap();
+ assert_eq!((1, None), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((0, None), rx.size_hint());
+ tx.unbounded_send(2).unwrap();
+ tx.unbounded_send(3).unwrap();
+ assert_eq!((2, None), rx.size_hint());
+ drop(tx);
+ assert_eq!((2, Some(2)), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((1, Some(1)), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((0, Some(0)), rx.size_hint());
+}
+
+#[test]
+fn channel_size_hint() {
+ let (mut tx, mut rx) = mpsc::channel::<u32>(10);
+ assert_eq!((0, None), rx.size_hint());
+ tx.try_send(1).unwrap();
+ assert_eq!((1, None), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((0, None), rx.size_hint());
+ tx.try_send(2).unwrap();
+ tx.try_send(3).unwrap();
+ assert_eq!((2, None), rx.size_hint());
+ drop(tx);
+ assert_eq!((2, Some(2)), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((1, Some(1)), rx.size_hint());
+ rx.try_next().unwrap().unwrap();
+ assert_eq!((0, Some(0)), rx.size_hint());
+}
diff --git a/third_party/rust/futures-channel/tests/mpsc.rs b/third_party/rust/futures-channel/tests/mpsc.rs
new file mode 100644
index 0000000000..444c8e10fd
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/mpsc.rs
@@ -0,0 +1,634 @@
+use futures::channel::{mpsc, oneshot};
+use futures::executor::{block_on, block_on_stream};
+use futures::future::{poll_fn, FutureExt};
+use futures::pin_mut;
+use futures::sink::{Sink, SinkExt};
+use futures::stream::{Stream, StreamExt};
+use futures::task::{Context, Poll};
+use futures_test::task::{new_count_waker, noop_context};
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex};
+use std::thread;
+
+trait AssertSend: Send {}
+impl AssertSend for mpsc::Sender<i32> {}
+impl AssertSend for mpsc::Receiver<i32> {}
+
+#[test]
+fn send_recv() {
+ let (mut tx, rx) = mpsc::channel::<i32>(16);
+
+ block_on(tx.send(1)).unwrap();
+ drop(tx);
+ let v: Vec<_> = block_on(rx.collect());
+ assert_eq!(v, vec![1]);
+}
+
+#[test]
+fn send_recv_no_buffer() {
+ // Run on a task context
+ block_on(poll_fn(move |cx| {
+ let (tx, rx) = mpsc::channel::<i32>(0);
+ pin_mut!(tx, rx);
+
+ assert!(tx.as_mut().poll_flush(cx).is_ready());
+ assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+ // Send first message
+ assert!(tx.as_mut().start_send(1).is_ok());
+ assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+ // poll_ready said Pending, so no room in buffer, therefore new sends
+ // should get rejected with is_full.
+ assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
+ assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+ // Take the value
+ assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
+ assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+ // Send second message
+ assert!(tx.as_mut().poll_ready(cx).is_ready());
+ assert!(tx.as_mut().start_send(2).is_ok());
+ assert!(tx.as_mut().poll_ready(cx).is_pending());
+
+ // Take the value
+ assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
+ assert!(tx.as_mut().poll_ready(cx).is_ready());
+
+ Poll::Ready(())
+ }));
+}
+
+#[test]
+fn send_shared_recv() {
+ let (mut tx1, rx) = mpsc::channel::<i32>(16);
+ let mut rx = block_on_stream(rx);
+ let mut tx2 = tx1.clone();
+
+ block_on(tx1.send(1)).unwrap();
+ assert_eq!(rx.next(), Some(1));
+
+ block_on(tx2.send(2)).unwrap();
+ assert_eq!(rx.next(), Some(2));
+}
+
+#[test]
+fn send_recv_threads() {
+ let (mut tx, rx) = mpsc::channel::<i32>(16);
+
+ let t = thread::spawn(move || {
+ block_on(tx.send(1)).unwrap();
+ });
+
+ let v: Vec<_> = block_on(rx.take(1).collect());
+ assert_eq!(v, vec![1]);
+
+ t.join().unwrap();
+}
+
+#[test]
+fn send_recv_threads_no_capacity() {
+ let (mut tx, rx) = mpsc::channel::<i32>(0);
+
+ let t = thread::spawn(move || {
+ block_on(tx.send(1)).unwrap();
+ block_on(tx.send(2)).unwrap();
+ });
+
+ let v: Vec<_> = block_on(rx.collect());
+ assert_eq!(v, vec![1, 2]);
+
+ t.join().unwrap();
+}
+
+#[test]
+fn recv_close_gets_none() {
+ let (mut tx, mut rx) = mpsc::channel::<i32>(10);
+
+ // Run on a task context
+ block_on(poll_fn(move |cx| {
+ rx.close();
+
+ assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
+ match tx.poll_ready(cx) {
+ Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
+ Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
+ };
+
+ Poll::Ready(())
+ }));
+}
+
+#[test]
+fn tx_close_gets_none() {
+ let (_, mut rx) = mpsc::channel::<i32>(10);
+
+ // Run on a task context
+ block_on(poll_fn(move |cx| {
+ assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
+ Poll::Ready(())
+ }));
+}
+
+// #[test]
+// fn spawn_sends_items() {
+// let core = local_executor::Core::new();
+// let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
+// let rx = mpsc::spawn(stream, &core, 1);
+// assert_eq!(core.run(rx.take(4).collect()).unwrap(),
+// [0, 1, 2, 3]);
+// }
+
+// #[test]
+// fn spawn_kill_dead_stream() {
+// use std::thread;
+// use std::time::Duration;
+// use futures::future::Either;
+// use futures::sync::oneshot;
+//
+// // a stream which never returns anything (maybe a remote end isn't
+// // responding), but dropping it leads to observable side effects
+// // (like closing connections, releasing limited resources, ...)
+// #[derive(Debug)]
+// struct Dead {
+// // when dropped you should get Err(oneshot::Canceled) on the
+// // receiving end
+// done: oneshot::Sender<()>,
+// }
+// impl Stream for Dead {
+// type Item = ();
+// type Error = ();
+//
+// fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+// Ok(Poll::Pending)
+// }
+// }
+//
+// // need to implement a timeout for the test, as it would hang
+// // forever right now
+// let (timeout_tx, timeout_rx) = oneshot::channel();
+// thread::spawn(move || {
+// thread::sleep(Duration::from_millis(1000));
+// let _ = timeout_tx.send(());
+// });
+//
+// let core = local_executor::Core::new();
+// let (done_tx, done_rx) = oneshot::channel();
+// let stream = Dead{done: done_tx};
+// let rx = mpsc::spawn(stream, &core, 1);
+// let res = core.run(
+// Ok::<_, ()>(())
+// .into_future()
+// .then(move |_| {
+// // now drop the spawned stream: maybe some timeout exceeded,
+// // or some connection on this end was closed by the remote
+// // end.
+// drop(rx);
+// // and wait for the spawned stream to release its resources
+// done_rx
+// })
+// .select2(timeout_rx)
+// );
+// match res {
+// Err(Either::A((oneshot::Canceled, _))) => (),
+// _ => {
+// panic!("dead stream wasn't canceled");
+// },
+// }
+// }
+
+#[test]
+fn stress_shared_unbounded() {
+ const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = mpsc::unbounded::<i32>();
+
+ let t = thread::spawn(move || {
+ let result: Vec<_> = block_on(rx.collect());
+ assert_eq!(result.len(), (AMT * NTHREADS) as usize);
+ for item in result {
+ assert_eq!(item, 1);
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ tx.unbounded_send(1).unwrap();
+ }
+ });
+ }
+
+ drop(tx);
+
+ t.join().ok().unwrap();
+}
+
+#[test]
+fn stress_shared_bounded_hard() {
+ const AMT: u32 = if cfg!(miri) { 100 } else { 10000 };
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = mpsc::channel::<i32>(0);
+
+ let t = thread::spawn(move || {
+ let result: Vec<_> = block_on(rx.collect());
+ assert_eq!(result.len(), (AMT * NTHREADS) as usize);
+ for item in result {
+ assert_eq!(item, 1);
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let mut tx = tx.clone();
+
+ thread::spawn(move || {
+ for _ in 0..AMT {
+ block_on(tx.send(1)).unwrap();
+ }
+ });
+ }
+
+ drop(tx);
+
+ t.join().unwrap();
+}
+
+#[allow(clippy::same_item_push)]
+#[test]
+fn stress_receiver_multi_task_bounded_hard() {
+ const AMT: usize = if cfg!(miri) { 100 } else { 10_000 };
+ const NTHREADS: u32 = 2;
+
+ let (mut tx, rx) = mpsc::channel::<usize>(0);
+ let rx = Arc::new(Mutex::new(Some(rx)));
+ let n = Arc::new(AtomicUsize::new(0));
+
+ let mut th = vec![];
+
+ for _ in 0..NTHREADS {
+ let rx = rx.clone();
+ let n = n.clone();
+
+ let t = thread::spawn(move || {
+ let mut i = 0;
+
+ loop {
+ i += 1;
+ let mut rx_opt = rx.lock().unwrap();
+ if let Some(rx) = &mut *rx_opt {
+ if i % 5 == 0 {
+ let item = block_on(rx.next());
+
+ if item.is_none() {
+ *rx_opt = None;
+ break;
+ }
+
+ n.fetch_add(1, Ordering::Relaxed);
+ } else {
+ // Just poll
+ let n = n.clone();
+ match rx.poll_next_unpin(&mut noop_context()) {
+ Poll::Ready(Some(_)) => {
+ n.fetch_add(1, Ordering::Relaxed);
+ }
+ Poll::Ready(None) => {
+ *rx_opt = None;
+ break;
+ }
+ Poll::Pending => {}
+ }
+ }
+ } else {
+ break;
+ }
+ }
+ });
+
+ th.push(t);
+ }
+
+ for i in 0..AMT {
+ block_on(tx.send(i)).unwrap();
+ }
+ drop(tx);
+
+ for t in th {
+ t.join().unwrap();
+ }
+
+ assert_eq!(AMT, n.load(Ordering::Relaxed));
+}
+
+/// Stress test that receiver properly receives all the messages
+/// after sender dropped.
+#[test]
+fn stress_drop_sender() {
+ const ITER: usize = if cfg!(miri) { 100 } else { 10000 };
+
+ fn list() -> impl Stream<Item = i32> {
+ let (tx, rx) = mpsc::channel(1);
+ thread::spawn(move || {
+ block_on(send_one_two_three(tx));
+ });
+ rx
+ }
+
+ for _ in 0..ITER {
+ let v: Vec<_> = block_on(list().collect());
+ assert_eq!(v, vec![1, 2, 3]);
+ }
+}
+
+async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
+ for i in 1..=3 {
+ tx.send(i).await.unwrap();
+ }
+}
+
+/// Stress test that after receiver dropped,
+/// no messages are lost.
+fn stress_close_receiver_iter() {
+ let (tx, rx) = mpsc::unbounded();
+ let mut rx = block_on_stream(rx);
+ let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
+ let th = thread::spawn(move || {
+ for i in 1.. {
+ if tx.unbounded_send(i).is_err() {
+ unwritten_tx.send(i).expect("unwritten_tx");
+ return;
+ }
+ }
+ });
+
+ // Read one message to make sure thread effectively started
+ assert_eq!(Some(1), rx.next());
+
+ rx.close();
+
+ for i in 2.. {
+ match rx.next() {
+ Some(r) => assert!(i == r),
+ None => {
+ let unwritten = unwritten_rx.recv().expect("unwritten_rx");
+ assert_eq!(unwritten, i);
+ th.join().unwrap();
+ return;
+ }
+ }
+ }
+}
+
+#[test]
+fn stress_close_receiver() {
+ const ITER: usize = if cfg!(miri) { 50 } else { 10000 };
+
+ for _ in 0..ITER {
+ stress_close_receiver_iter();
+ }
+}
+
+async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
+ for i in (1..=count).rev() {
+ sender.send(i).await.unwrap();
+ }
+}
+
+/// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
+#[allow(clippy::same_item_push)]
+#[test]
+fn stress_poll_ready() {
+ const AMT: u32 = if cfg!(miri) { 100 } else { 1000 };
+ const NTHREADS: u32 = 8;
+
+ /// Run a stress test using the specified channel capacity.
+ fn stress(capacity: usize) {
+ let (tx, rx) = mpsc::channel(capacity);
+ let mut threads = Vec::new();
+ for _ in 0..NTHREADS {
+ let sender = tx.clone();
+ threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
+ }
+ drop(tx);
+
+ let result: Vec<_> = block_on(rx.collect());
+ assert_eq!(result.len() as u32, AMT * NTHREADS);
+
+ for thread in threads {
+ thread.join().unwrap();
+ }
+ }
+
+ stress(0);
+ stress(1);
+ stress(8);
+ stress(16);
+}
+
+#[test]
+fn try_send_1() {
+ const N: usize = if cfg!(miri) { 100 } else { 3000 };
+ let (mut tx, rx) = mpsc::channel(0);
+
+ let t = thread::spawn(move || {
+ for i in 0..N {
+ loop {
+ if tx.try_send(i).is_ok() {
+ break;
+ }
+ }
+ }
+ });
+
+ let result: Vec<_> = block_on(rx.collect());
+ for (i, j) in result.into_iter().enumerate() {
+ assert_eq!(i, j);
+ }
+
+ t.join().unwrap();
+}
+
+#[test]
+fn try_send_2() {
+ let (mut tx, rx) = mpsc::channel(0);
+ let mut rx = block_on_stream(rx);
+
+ tx.try_send("hello").unwrap();
+
+ let (readytx, readyrx) = oneshot::channel::<()>();
+
+ let th = thread::spawn(move || {
+ block_on(poll_fn(|cx| {
+ assert!(tx.poll_ready(cx).is_pending());
+ Poll::Ready(())
+ }));
+
+ drop(readytx);
+ block_on(tx.send("goodbye")).unwrap();
+ });
+
+ let _ = block_on(readyrx);
+ assert_eq!(rx.next(), Some("hello"));
+ assert_eq!(rx.next(), Some("goodbye"));
+ assert_eq!(rx.next(), None);
+
+ th.join().unwrap();
+}
+
+#[test]
+fn try_send_fail() {
+ let (mut tx, rx) = mpsc::channel(0);
+ let mut rx = block_on_stream(rx);
+
+ tx.try_send("hello").unwrap();
+
+ // This should fail
+ assert!(tx.try_send("fail").is_err());
+
+ assert_eq!(rx.next(), Some("hello"));
+
+ tx.try_send("goodbye").unwrap();
+ drop(tx);
+
+ assert_eq!(rx.next(), Some("goodbye"));
+ assert_eq!(rx.next(), None);
+}
+
+#[test]
+fn try_send_recv() {
+ let (mut tx, mut rx) = mpsc::channel(1);
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap();
+ tx.try_send("hello").unwrap_err(); // should be full
+ rx.try_next().unwrap();
+ rx.try_next().unwrap();
+ rx.try_next().unwrap_err(); // should be empty
+ tx.try_send("hello").unwrap();
+ rx.try_next().unwrap();
+ rx.try_next().unwrap_err(); // should be empty
+}
+
+#[test]
+fn same_receiver() {
+ let (mut txa1, _) = mpsc::channel::<i32>(1);
+ let txa2 = txa1.clone();
+
+ let (mut txb1, _) = mpsc::channel::<i32>(1);
+ let txb2 = txb1.clone();
+
+ assert!(txa1.same_receiver(&txa2));
+ assert!(txb1.same_receiver(&txb2));
+ assert!(!txa1.same_receiver(&txb1));
+
+ txa1.disconnect();
+ txb1.close_channel();
+
+ assert!(!txa1.same_receiver(&txa2));
+ assert!(txb1.same_receiver(&txb2));
+}
+
+#[test]
+fn is_connected_to() {
+ let (txa, rxa) = mpsc::channel::<i32>(1);
+ let (txb, rxb) = mpsc::channel::<i32>(1);
+
+ assert!(txa.is_connected_to(&rxa));
+ assert!(txb.is_connected_to(&rxb));
+ assert!(!txa.is_connected_to(&rxb));
+ assert!(!txb.is_connected_to(&rxa));
+}
+
+#[test]
+fn hash_receiver() {
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
+
+ let mut hasher_a1 = DefaultHasher::new();
+ let mut hasher_a2 = DefaultHasher::new();
+ let mut hasher_b1 = DefaultHasher::new();
+ let mut hasher_b2 = DefaultHasher::new();
+ let (mut txa1, _) = mpsc::channel::<i32>(1);
+ let txa2 = txa1.clone();
+
+ let (mut txb1, _) = mpsc::channel::<i32>(1);
+ let txb2 = txb1.clone();
+
+ txa1.hash_receiver(&mut hasher_a1);
+ let hash_a1 = hasher_a1.finish();
+ txa2.hash_receiver(&mut hasher_a2);
+ let hash_a2 = hasher_a2.finish();
+ txb1.hash_receiver(&mut hasher_b1);
+ let hash_b1 = hasher_b1.finish();
+ txb2.hash_receiver(&mut hasher_b2);
+ let hash_b2 = hasher_b2.finish();
+
+ assert_eq!(hash_a1, hash_a2);
+ assert_eq!(hash_b1, hash_b2);
+ assert!(hash_a1 != hash_b1);
+
+ txa1.disconnect();
+ txb1.close_channel();
+
+ let mut hasher_a1 = DefaultHasher::new();
+ let mut hasher_a2 = DefaultHasher::new();
+ let mut hasher_b1 = DefaultHasher::new();
+ let mut hasher_b2 = DefaultHasher::new();
+
+ txa1.hash_receiver(&mut hasher_a1);
+ let hash_a1 = hasher_a1.finish();
+ txa2.hash_receiver(&mut hasher_a2);
+ let hash_a2 = hasher_a2.finish();
+ txb1.hash_receiver(&mut hasher_b1);
+ let hash_b1 = hasher_b1.finish();
+ txb2.hash_receiver(&mut hasher_b2);
+ let hash_b2 = hasher_b2.finish();
+
+ assert!(hash_a1 != hash_a2);
+ assert_eq!(hash_b1, hash_b2);
+}
+
+#[test]
+fn send_backpressure() {
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let (mut tx, mut rx) = mpsc::channel(1);
+ block_on(tx.send(1)).unwrap();
+
+ let mut task = tx.send(2);
+ assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(counter, 0);
+
+ let item = block_on(rx.next()).unwrap();
+ assert_eq!(item, 1);
+ assert_eq!(counter, 1);
+ assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
+
+ let item = block_on(rx.next()).unwrap();
+ assert_eq!(item, 2);
+}
+
+#[test]
+fn send_backpressure_multi_senders() {
+ let (waker, counter) = new_count_waker();
+ let mut cx = Context::from_waker(&waker);
+
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let mut tx2 = tx1.clone();
+ block_on(tx1.send(1)).unwrap();
+
+ let mut task = tx2.send(2);
+ assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
+ assert_eq!(counter, 0);
+
+ let item = block_on(rx.next()).unwrap();
+ assert_eq!(item, 1);
+ assert_eq!(counter, 1);
+ assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
+
+ let item = block_on(rx.next()).unwrap();
+ assert_eq!(item, 2);
+}
diff --git a/third_party/rust/futures-channel/tests/oneshot.rs b/third_party/rust/futures-channel/tests/oneshot.rs
new file mode 100644
index 0000000000..6b48376dc0
--- /dev/null
+++ b/third_party/rust/futures-channel/tests/oneshot.rs
@@ -0,0 +1,256 @@
+use futures::channel::oneshot::{self, Sender};
+use futures::executor::block_on;
+use futures::future::{poll_fn, FutureExt};
+use futures::task::{Context, Poll};
+use futures_test::task::panic_waker_ref;
+use std::sync::mpsc;
+use std::thread;
+
+#[test]
+fn smoke_poll() {
+ let (mut tx, rx) = oneshot::channel::<u32>();
+ let mut rx = Some(rx);
+ let f = poll_fn(|cx| {
+ assert!(tx.poll_canceled(cx).is_pending());
+ assert!(tx.poll_canceled(cx).is_pending());
+ drop(rx.take());
+ assert!(tx.poll_canceled(cx).is_ready());
+ assert!(tx.poll_canceled(cx).is_ready());
+ Poll::Ready(())
+ });
+
+ block_on(f);
+}
+
+#[test]
+fn cancel_notifies() {
+ let (mut tx, rx) = oneshot::channel::<u32>();
+
+ let t = thread::spawn(move || {
+ block_on(tx.cancellation());
+ });
+ drop(rx);
+ t.join().unwrap();
+}
+
+#[test]
+fn cancel_lots() {
+ const N: usize = if cfg!(miri) { 100 } else { 20000 };
+
+ let (tx, rx) = mpsc::channel::<(Sender<_>, mpsc::Sender<_>)>();
+ let t = thread::spawn(move || {
+ for (mut tx, tx2) in rx {
+ block_on(tx.cancellation());
+ tx2.send(()).unwrap();
+ }
+ });
+
+ for _ in 0..N {
+ let (otx, orx) = oneshot::channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ tx.send((otx, tx2)).unwrap();
+ drop(orx);
+ rx2.recv().unwrap();
+ }
+ drop(tx);
+
+ t.join().unwrap();
+}
+
+#[test]
+fn cancel_after_sender_drop_doesnt_notify() {
+ let (mut tx, rx) = oneshot::channel::<u32>();
+ let mut cx = Context::from_waker(panic_waker_ref());
+ assert_eq!(tx.poll_canceled(&mut cx), Poll::Pending);
+ drop(tx);
+ drop(rx);
+}
+
+#[test]
+fn close() {
+ let (mut tx, mut rx) = oneshot::channel::<u32>();
+ rx.close();
+ block_on(poll_fn(|cx| {
+ match rx.poll_unpin(cx) {
+ Poll::Ready(Err(_)) => {}
+ _ => panic!(),
+ };
+ assert!(tx.poll_canceled(cx).is_ready());
+ Poll::Ready(())
+ }));
+}
+
+#[test]
+fn close_wakes() {
+ let (mut tx, mut rx) = oneshot::channel::<u32>();
+ let (tx2, rx2) = mpsc::channel();
+ let t = thread::spawn(move || {
+ rx.close();
+ rx2.recv().unwrap();
+ });
+ block_on(tx.cancellation());
+ tx2.send(()).unwrap();
+ t.join().unwrap();
+}
+
+#[test]
+fn is_canceled() {
+ let (tx, rx) = oneshot::channel::<u32>();
+ assert!(!tx.is_canceled());
+ drop(rx);
+ assert!(tx.is_canceled());
+}
+
+#[test]
+fn cancel_sends() {
+ const N: usize = if cfg!(miri) { 100 } else { 20000 };
+
+ let (tx, rx) = mpsc::channel::<Sender<_>>();
+ let t = thread::spawn(move || {
+ for otx in rx {
+ let _ = otx.send(42);
+ }
+ });
+
+ for _ in 0..N {
+ let (otx, mut orx) = oneshot::channel::<u32>();
+ tx.send(otx).unwrap();
+
+ orx.close();
+ let _ = block_on(orx);
+ }
+
+ drop(tx);
+ t.join().unwrap();
+}
+
+// #[test]
+// fn spawn_sends_items() {
+// let core = local_executor::Core::new();
+// let future = ok::<_, ()>(1);
+// let rx = spawn(future, &core);
+// assert_eq!(core.run(rx).unwrap(), 1);
+// }
+//
+// #[test]
+// fn spawn_kill_dead_stream() {
+// use std::thread;
+// use std::time::Duration;
+// use futures::future::Either;
+// use futures::sync::oneshot;
+//
+// // a future which never returns anything (forever accepting incoming
+// // connections), but dropping it leads to observable side effects
+// // (like closing listening sockets, releasing limited resources,
+// // ...)
+// #[derive(Debug)]
+// struct Dead {
+// // when dropped you should get Err(oneshot::Canceled) on the
+// // receiving end
+// done: oneshot::Sender<()>,
+// }
+// impl Future for Dead {
+// type Item = ();
+// type Error = ();
+//
+// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+// Ok(Poll::Pending)
+// }
+// }
+//
+// // need to implement a timeout for the test, as it would hang
+// // forever right now
+// let (timeout_tx, timeout_rx) = oneshot::channel();
+// thread::spawn(move || {
+// thread::sleep(Duration::from_millis(1000));
+// let _ = timeout_tx.send(());
+// });
+//
+// let core = local_executor::Core::new();
+// let (done_tx, done_rx) = oneshot::channel();
+// let future = Dead{done: done_tx};
+// let rx = spawn(future, &core);
+// let res = core.run(
+// Ok::<_, ()>(())
+// .into_future()
+// .then(move |_| {
+// // now drop the spawned future: maybe some timeout exceeded,
+// // or some connection on this end was closed by the remote
+// // end.
+// drop(rx);
+// // and wait for the spawned future to release its resources
+// done_rx
+// })
+// .select2(timeout_rx)
+// );
+// match res {
+// Err(Either::A((oneshot::Canceled, _))) => (),
+// Ok(Either::B(((), _))) => {
+// panic!("dead future wasn't canceled (timeout)");
+// },
+// _ => {
+// panic!("dead future wasn't canceled (unexpected result)");
+// },
+// }
+// }
+//
+// #[test]
+// fn spawn_dont_kill_forgot_dead_stream() {
+// use std::thread;
+// use std::time::Duration;
+// use futures::future::Either;
+// use futures::sync::oneshot;
+//
+// // a future which never returns anything (forever accepting incoming
+// // connections), but dropping it leads to observable side effects
+// // (like closing listening sockets, releasing limited resources,
+// // ...)
+// #[derive(Debug)]
+// struct Dead {
+// // when dropped you should get Err(oneshot::Canceled) on the
+// // receiving end
+// done: oneshot::Sender<()>,
+// }
+// impl Future for Dead {
+// type Item = ();
+// type Error = ();
+//
+// fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+// Ok(Poll::Pending)
+// }
+// }
+//
+// // need to implement a timeout for the test, as it would hang
+// // forever right now
+// let (timeout_tx, timeout_rx) = oneshot::channel();
+// thread::spawn(move || {
+// thread::sleep(Duration::from_millis(1000));
+// let _ = timeout_tx.send(());
+// });
+//
+// let core = local_executor::Core::new();
+// let (done_tx, done_rx) = oneshot::channel();
+// let future = Dead{done: done_tx};
+// let rx = spawn(future, &core);
+// let res = core.run(
+// Ok::<_, ()>(())
+// .into_future()
+// .then(move |_| {
+// // forget the spawned future: should keep running, i.e. hit
+// // the timeout below.
+// rx.forget();
+// // and wait for the spawned future to release its resources
+// done_rx
+// })
+// .select2(timeout_rx)
+// );
+// match res {
+// Err(Either::A((oneshot::Canceled, _))) => {
+// panic!("forgotten dead future was canceled");
+// },
+// Ok(Either::B(((), _))) => (), // reached timeout
+// _ => {
+// panic!("forgotten dead future was canceled (unexpected result)");
+// },
+// }
+// }