summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/sync_mpsc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/sync_mpsc.rs')
-rw-r--r--third_party/rust/tokio/tests/sync_mpsc.rs492
1 files changed, 492 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/sync_mpsc.rs b/third_party/rust/tokio/tests/sync_mpsc.rs
new file mode 100644
index 0000000000..f02d90aa56
--- /dev/null
+++ b/third_party/rust/tokio/tests/sync_mpsc.rs
@@ -0,0 +1,492 @@
+#![allow(clippy::redundant_clone)]
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "full")]
+
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
+use tokio_test::task;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+
+use std::sync::Arc;
+
+trait AssertSend: Send {}
+impl AssertSend for mpsc::Sender<i32> {}
+impl AssertSend for mpsc::Receiver<i32> {}
+
+#[test]
+fn send_recv_with_buffer() {
+ let (tx, rx) = mpsc::channel::<i32>(16);
+ let mut tx = task::spawn(tx);
+ let mut rx = task::spawn(rx);
+
+ // Using poll_ready / try_send
+ assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx)));
+ tx.try_send(1).unwrap();
+
+ // Without poll_ready
+ tx.try_send(2).unwrap();
+
+ drop(tx);
+
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ assert_eq!(val, Some(1));
+
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ assert_eq!(val, Some(2));
+
+ let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx)));
+ assert!(val.is_none());
+}
+
+#[test]
+fn disarm() {
+ let (tx, rx) = mpsc::channel::<i32>(2);
+ let mut tx1 = task::spawn(tx.clone());
+ let mut tx2 = task::spawn(tx.clone());
+ let mut tx3 = task::spawn(tx.clone());
+ let mut tx4 = task::spawn(tx);
+ let mut rx = task::spawn(rx);
+
+ // We should be able to `poll_ready` two handles without problem
+ assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // But a third should not be ready
+ assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Using one of the reserved slots should allow a new handle to become ready
+ tx1.try_send(1).unwrap();
+ // We also need to receive for the slot to be free
+ let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap();
+ // Now there's a free slot!
+ assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Dropping a ready handle should also open up a slot
+ drop(tx2);
+ assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx)));
+ assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Explicitly disarming a handle should also open a slot
+ assert!(tx3.disarm());
+ assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx)));
+
+ // Disarming a non-armed sender does not free up a slot
+ assert!(!tx3.disarm());
+ assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx)));
+}
+
+#[tokio::test]
+async fn send_recv_stream_with_buffer() {
+ use tokio::stream::StreamExt;
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(16);
+
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
+ });
+
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
+}
+
+#[tokio::test]
+async fn async_send_recv_with_buffer() {
+ let (mut tx, mut rx) = mpsc::channel(16);
+
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1).await);
+ assert_ok!(tx.send(2).await);
+ });
+
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
+}
+
+#[test]
+fn start_send_past_cap() {
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+ let mut t3 = task::spawn(());
+
+ let (mut tx1, mut rx) = mpsc::channel(1);
+ let mut tx2 = tx1.clone();
+
+ assert_ok!(tx1.try_send(()));
+
+ t1.enter(|cx, _| {
+ assert_pending!(tx1.poll_ready(cx));
+ });
+
+ t2.enter(|cx, _| {
+ assert_pending!(tx2.poll_ready(cx));
+ });
+
+ drop(tx1);
+
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
+ assert!(val.is_some());
+
+ assert!(t2.is_woken());
+ assert!(!t1.is_woken());
+
+ drop(tx2);
+
+ let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx)));
+ assert!(val.is_none());
+}
+
+#[test]
+#[should_panic]
+fn buffer_gteq_one() {
+ mpsc::channel::<i32>(0);
+}
+
+#[test]
+fn send_recv_unbounded() {
+ let mut t1 = task::spawn(());
+
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
+
+ // Using `try_send`
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert_eq!(val, Some(1));
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert_eq!(val, Some(2));
+
+ drop(tx);
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert!(val.is_none());
+}
+
+#[tokio::test]
+async fn async_send_recv_unbounded() {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ });
+
+ assert_eq!(Some(1), rx.recv().await);
+ assert_eq!(Some(2), rx.recv().await);
+ assert_eq!(None, rx.recv().await);
+}
+
+#[tokio::test]
+async fn send_recv_stream_unbounded() {
+ use tokio::stream::StreamExt;
+
+ let (tx, mut rx) = mpsc::unbounded_channel::<i32>();
+
+ tokio::spawn(async move {
+ assert_ok!(tx.send(1));
+ assert_ok!(tx.send(2));
+ });
+
+ assert_eq!(Some(1), rx.next().await);
+ assert_eq!(Some(2), rx.next().await);
+ assert_eq!(None, rx.next().await);
+}
+
+#[test]
+fn no_t_bounds_buffer() {
+ struct NoImpls;
+
+ let mut t1 = task::spawn(());
+ let (tx, mut rx) = mpsc::channel(100);
+
+ // sender should be Debug even though T isn't Debug
+ println!("{:?}", tx);
+ // same with Receiver
+ println!("{:?}", rx);
+ // and sender should be Clone even though T isn't Clone
+ assert!(tx.clone().try_send(NoImpls).is_ok());
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert!(val.is_some());
+}
+
+#[test]
+fn no_t_bounds_unbounded() {
+ struct NoImpls;
+
+ let mut t1 = task::spawn(());
+ let (tx, mut rx) = mpsc::unbounded_channel();
+
+ // sender should be Debug even though T isn't Debug
+ println!("{:?}", tx);
+ // same with Receiver
+ println!("{:?}", rx);
+ // and sender should be Clone even though T isn't Clone
+ assert!(tx.clone().send(NoImpls).is_ok());
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert!(val.is_some());
+}
+
+#[test]
+fn send_recv_buffer_limited() {
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(1);
+
+ // Run on a task context
+ t1.enter(|cx, _| {
+ assert_ready_ok!(tx.poll_ready(cx));
+
+ // Send first message
+ assert_ok!(tx.try_send(1));
+
+ // Not ready
+ assert_pending!(tx.poll_ready(cx));
+
+ // Send second message
+ assert_err!(tx.try_send(1337));
+ });
+
+ t2.enter(|cx, _| {
+ // Take the value
+ let val = assert_ready!(rx.poll_recv(cx));
+ assert_eq!(Some(1), val);
+ });
+
+ assert!(t1.is_woken());
+
+ t1.enter(|cx, _| {
+ assert_ready_ok!(tx.poll_ready(cx));
+
+ assert_ok!(tx.try_send(2));
+
+ // Not ready
+ assert_pending!(tx.poll_ready(cx));
+ });
+
+ t2.enter(|cx, _| {
+ // Take the value
+ let val = assert_ready!(rx.poll_recv(cx));
+ assert_eq!(Some(2), val);
+ });
+
+ t1.enter(|cx, _| {
+ assert_ready_ok!(tx.poll_ready(cx));
+ });
+}
+
+#[test]
+fn recv_close_gets_none_idle() {
+ let mut t1 = task::spawn(());
+
+ let (mut tx, mut rx) = mpsc::channel::<i32>(10);
+
+ rx.close();
+
+ t1.enter(|cx, _| {
+ let val = assert_ready!(rx.poll_recv(cx));
+ assert!(val.is_none());
+ assert_ready_err!(tx.poll_ready(cx));
+ });
+}
+
+#[test]
+fn recv_close_gets_none_reserved() {
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+ let mut t3 = task::spawn(());
+
+ let (mut tx1, mut rx) = mpsc::channel::<i32>(1);
+ let mut tx2 = tx1.clone();
+
+ assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
+
+ t2.enter(|cx, _| {
+ assert_pending!(tx2.poll_ready(cx));
+ });
+
+ rx.close();
+
+ assert!(t2.is_woken());
+
+ t2.enter(|cx, _| {
+ assert_ready_err!(tx2.poll_ready(cx));
+ });
+
+ t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx)));
+
+ assert!(!t1.is_woken());
+ assert!(!t2.is_woken());
+
+ assert_ok!(tx1.try_send(123));
+
+ assert!(t3.is_woken());
+
+ t3.enter(|cx, _| {
+ let v = assert_ready!(rx.poll_recv(cx));
+ assert_eq!(v, Some(123));
+
+ let v = assert_ready!(rx.poll_recv(cx));
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn tx_close_gets_none() {
+ let mut t1 = task::spawn(());
+
+ let (_, mut rx) = mpsc::channel::<i32>(10);
+
+ // Run on a task context
+ t1.enter(|cx, _| {
+ let v = assert_ready!(rx.poll_recv(cx));
+ assert!(v.is_none());
+ });
+}
+
+#[test]
+fn try_send_fail() {
+ let mut t1 = task::spawn(());
+
+ let (mut tx, mut rx) = mpsc::channel(1);
+
+ tx.try_send("hello").unwrap();
+
+ // This should fail
+ match assert_err!(tx.try_send("fail")) {
+ TrySendError::Full(..) => {}
+ _ => panic!(),
+ }
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert_eq!(val, Some("hello"));
+
+ assert_ok!(tx.try_send("goodbye"));
+ drop(tx);
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert_eq!(val, Some("goodbye"));
+
+ let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx)));
+ assert!(val.is_none());
+}
+
+#[test]
+fn drop_tx_with_permit_releases_permit() {
+ let mut t1 = task::spawn(());
+ let mut t2 = task::spawn(());
+
+ // poll_ready reserves capacity, ensure that the capacity is released if tx
+ // is dropped w/o sending a value.
+ let (mut tx1, _rx) = mpsc::channel::<i32>(1);
+ let mut tx2 = tx1.clone();
+
+ assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx)));
+
+ t2.enter(|cx, _| {
+ assert_pending!(tx2.poll_ready(cx));
+ });
+
+ drop(tx1);
+
+ assert!(t2.is_woken());
+
+ assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx)));
+}
+
+#[test]
+fn dropping_rx_closes_channel() {
+ let mut t1 = task::spawn(());
+
+ let (mut tx, rx) = mpsc::channel(100);
+
+ let msg = Arc::new(());
+ assert_ok!(tx.try_send(msg.clone()));
+
+ drop(rx);
+ assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx)));
+
+ assert_eq!(1, Arc::strong_count(&msg));
+}
+
+#[test]
+fn dropping_rx_closes_channel_for_try() {
+ let (mut tx, rx) = mpsc::channel(100);
+
+ let msg = Arc::new(());
+ tx.try_send(msg.clone()).unwrap();
+
+ drop(rx);
+
+ {
+ let err = assert_err!(tx.try_send(msg.clone()));
+ match err {
+ TrySendError::Closed(..) => {}
+ _ => panic!(),
+ }
+ }
+
+ assert_eq!(1, Arc::strong_count(&msg));
+}
+
+#[test]
+fn unconsumed_messages_are_dropped() {
+ let msg = Arc::new(());
+
+ let (mut tx, rx) = mpsc::channel(100);
+
+ tx.try_send(msg.clone()).unwrap();
+
+ assert_eq!(2, Arc::strong_count(&msg));
+
+ drop((tx, rx));
+
+ assert_eq!(1, Arc::strong_count(&msg));
+}
+
+#[test]
+fn try_recv() {
+ let (mut tx, mut rx) = mpsc::channel(1);
+ match rx.try_recv() {
+ Err(TryRecvError::Empty) => {}
+ _ => panic!(),
+ }
+ tx.try_send(42).unwrap();
+ match rx.try_recv() {
+ Ok(42) => {}
+ _ => panic!(),
+ }
+ drop(tx);
+ match rx.try_recv() {
+ Err(TryRecvError::Closed) => {}
+ _ => panic!(),
+ }
+}
+
+#[test]
+fn try_recv_unbounded() {
+ let (tx, mut rx) = mpsc::unbounded_channel();
+ match rx.try_recv() {
+ Err(TryRecvError::Empty) => {}
+ _ => panic!(),
+ }
+ tx.send(42).unwrap();
+ match rx.try_recv() {
+ Ok(42) => {}
+ _ => panic!(),
+ }
+ drop(tx);
+ match rx.try_recv() {
+ Err(TryRecvError::Closed) => {}
+ _ => panic!(),
+ }
+}