diff options
Diffstat (limited to 'third_party/rust/tokio/tests/sync_mpsc.rs')
-rw-r--r-- | third_party/rust/tokio/tests/sync_mpsc.rs | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/sync_mpsc.rs b/third_party/rust/tokio/tests/sync_mpsc.rs new file mode 100644 index 0000000000..f02d90aa56 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_mpsc.rs @@ -0,0 +1,492 @@ +#![allow(clippy::redundant_clone)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio_test::task; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[test] +fn send_recv_with_buffer() { + let (tx, rx) = mpsc::channel::<i32>(16); + let mut tx = task::spawn(tx); + let mut rx = task::spawn(rx); + + // Using poll_ready / try_send + assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); + tx.try_send(1).unwrap(); + + // Without poll_ready + tx.try_send(2).unwrap(); + + drop(tx); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert_eq!(val, Some(1)); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert_eq!(val, Some(2)); + + let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +fn disarm() { + let (tx, rx) = mpsc::channel::<i32>(2); + let mut tx1 = task::spawn(tx.clone()); + let mut tx2 = task::spawn(tx.clone()); + let mut tx3 = task::spawn(tx.clone()); + let mut tx4 = task::spawn(tx); + let mut rx = task::spawn(rx); + + // We should be able to `poll_ready` two handles without problem + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); + + // But a third should not be ready + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Using one of the reserved slots should allow a new handle to become ready + tx1.try_send(1).unwrap(); + // We also need to receive for the slot to be free + let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); + // Now there's a free slot! + assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Dropping a ready handle should also open up a slot + drop(tx2); + assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); + assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Explicitly disarming a handle should also open a slot + assert!(tx3.disarm()); + assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); + + // Disarming a non-armed sender does not free up a slot + assert!(!tx3.disarm()); + assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); +} + +#[tokio::test] +async fn send_recv_stream_with_buffer() { + use tokio::stream::StreamExt; + + let (mut tx, mut rx) = mpsc::channel::<i32>(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[tokio::test] +async fn async_send_recv_with_buffer() { + let (mut tx, mut rx) = mpsc::channel(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[test] +fn start_send_past_cap() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); + + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + + assert_ok!(tx1.try_send(())); + + t1.enter(|cx, _| { + assert_pending!(tx1.poll_ready(cx)); + }); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + drop(tx1); + + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); + assert!(val.is_some()); + + assert!(t2.is_woken()); + assert!(!t1.is_woken()); + + drop(tx2); + + let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +#[should_panic] +fn buffer_gteq_one() { + mpsc::channel::<i32>(0); +} + +#[test] +fn send_recv_unbounded() { + let mut t1 = task::spawn(()); + + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); + + // Using `try_send` + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some(1)); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some(2)); + + drop(tx); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[tokio::test] +async fn async_send_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[tokio::test] +async fn send_recv_stream_unbounded() { + use tokio::stream::StreamExt; + + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[test] +fn no_t_bounds_buffer() { + struct NoImpls; + + let mut t1 = task::spawn(()); + let (tx, mut rx) = mpsc::channel(100); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().try_send(NoImpls).is_ok()); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_some()); +} + +#[test] +fn no_t_bounds_unbounded() { + struct NoImpls; + + let mut t1 = task::spawn(()); + let (tx, mut rx) = mpsc::unbounded_channel(); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().send(NoImpls).is_ok()); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_some()); +} + +#[test] +fn send_recv_buffer_limited() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel::<i32>(1); + + // Run on a task context + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + + // Send first message + assert_ok!(tx.try_send(1)); + + // Not ready + assert_pending!(tx.poll_ready(cx)); + + // Send second message + assert_err!(tx.try_send(1337)); + }); + + t2.enter(|cx, _| { + // Take the value + let val = assert_ready!(rx.poll_recv(cx)); + assert_eq!(Some(1), val); + }); + + assert!(t1.is_woken()); + + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + + assert_ok!(tx.try_send(2)); + + // Not ready + assert_pending!(tx.poll_ready(cx)); + }); + + t2.enter(|cx, _| { + // Take the value + let val = assert_ready!(rx.poll_recv(cx)); + assert_eq!(Some(2), val); + }); + + t1.enter(|cx, _| { + assert_ready_ok!(tx.poll_ready(cx)); + }); +} + +#[test] +fn recv_close_gets_none_idle() { + let mut t1 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel::<i32>(10); + + rx.close(); + + t1.enter(|cx, _| { + let val = assert_ready!(rx.poll_recv(cx)); + assert!(val.is_none()); + assert_ready_err!(tx.poll_ready(cx)); + }); +} + +#[test] +fn recv_close_gets_none_reserved() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + let mut t3 = task::spawn(()); + + let (mut tx1, mut rx) = mpsc::channel::<i32>(1); + let mut tx2 = tx1.clone(); + + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + rx.close(); + + assert!(t2.is_woken()); + + t2.enter(|cx, _| { + assert_ready_err!(tx2.poll_ready(cx)); + }); + + t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); + + assert!(!t1.is_woken()); + assert!(!t2.is_woken()); + + assert_ok!(tx1.try_send(123)); + + assert!(t3.is_woken()); + + t3.enter(|cx, _| { + let v = assert_ready!(rx.poll_recv(cx)); + assert_eq!(v, Some(123)); + + let v = assert_ready!(rx.poll_recv(cx)); + assert!(v.is_none()); + }); +} + +#[test] +fn tx_close_gets_none() { + let mut t1 = task::spawn(()); + + let (_, mut rx) = mpsc::channel::<i32>(10); + + // Run on a task context + t1.enter(|cx, _| { + let v = assert_ready!(rx.poll_recv(cx)); + assert!(v.is_none()); + }); +} + +#[test] +fn try_send_fail() { + let mut t1 = task::spawn(()); + + let (mut tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert_eq!(val, Some("goodbye")); + + let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); + assert!(val.is_none()); +} + +#[test] +fn drop_tx_with_permit_releases_permit() { + let mut t1 = task::spawn(()); + let mut t2 = task::spawn(()); + + // poll_ready reserves capacity, ensure that the capacity is released if tx + // is dropped w/o sending a value. + let (mut tx1, _rx) = mpsc::channel::<i32>(1); + let mut tx2 = tx1.clone(); + + assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); + + t2.enter(|cx, _| { + assert_pending!(tx2.poll_ready(cx)); + }); + + drop(tx1); + + assert!(t2.is_woken()); + + assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); +} + +#[test] +fn dropping_rx_closes_channel() { + let mut t1 = task::spawn(()); + + let (mut tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + assert_ok!(tx.try_send(msg.clone())); + + drop(rx); + assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn dropping_rx_closes_channel_for_try() { + let (mut tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + tx.try_send(msg.clone()).unwrap(); + + drop(rx); + + { + let err = assert_err!(tx.try_send(msg.clone())); + match err { + TrySendError::Closed(..) => {} + _ => panic!(), + } + } + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn unconsumed_messages_are_dropped() { + let msg = Arc::new(()); + + let (mut tx, rx) = mpsc::channel(100); + + tx.try_send(msg.clone()).unwrap(); + + assert_eq!(2, Arc::strong_count(&msg)); + + drop((tx, rx)); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn try_recv() { + let (mut tx, mut rx) = mpsc::channel(1); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.try_send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} + +#[test] +fn try_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + match rx.try_recv() { + Err(TryRecvError::Empty) => {} + _ => panic!(), + } + tx.send(42).unwrap(); + match rx.try_recv() { + Ok(42) => {} + _ => panic!(), + } + drop(tx); + match rx.try_recv() { + Err(TryRecvError::Closed) => {} + _ => panic!(), + } +} |