diff options
Diffstat (limited to 'third_party/rust/tokio/tests/sync_mpsc.rs')
-rw-r--r-- | third_party/rust/tokio/tests/sync_mpsc.rs | 659 |
1 files changed, 659 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/sync_mpsc.rs b/third_party/rust/tokio/tests/sync_mpsc.rs new file mode 100644 index 0000000000..abbfa9d7f4 --- /dev/null +++ b/third_party/rust/tokio/tests/sync_mpsc.rs @@ -0,0 +1,659 @@ +#![allow(clippy::redundant_clone)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "sync")] + +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_test::wasm_bindgen_test as test; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; + +#[cfg(not(target_arch = "wasm32"))] +use tokio::test as maybe_tokio_test; + +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; +use tokio_test::*; + +use std::sync::Arc; + +#[cfg(not(target_arch = "wasm32"))] +mod support { + pub(crate) mod mpsc_stream; +} + +trait AssertSend: Send {} +impl AssertSend for mpsc::Sender<i32> {} +impl AssertSend for mpsc::Receiver<i32> {} + +#[maybe_tokio_test] +async fn send_recv_with_buffer() { + let (tx, mut rx) = mpsc::channel::<i32>(16); + + // Using poll_ready / try_send + // let permit assert_ready_ok!(tx.reserve()); + let permit = tx.reserve().await.unwrap(); + permit.send(1); + + // Without poll_ready + tx.try_send(2).unwrap(); + + drop(tx); + + let val = rx.recv().await; + assert_eq!(val, Some(1)); + + let val = rx.recv().await; + assert_eq!(val, Some(2)); + + let val = rx.recv().await; + assert!(val.is_none()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn reserve_disarm() { + let (tx, mut rx) = mpsc::channel::<i32>(2); + let tx1 = tx.clone(); + let tx2 = tx.clone(); + let tx3 = tx.clone(); + let tx4 = tx; + + // We should be able to `poll_ready` two handles without problem + let permit1 = assert_ok!(tx1.reserve().await); + let permit2 = assert_ok!(tx2.reserve().await); + + // But a third should not be ready + let mut r3 = tokio_test::task::spawn(tx3.reserve()); + assert_pending!(r3.poll()); + + let mut r4 = tokio_test::task::spawn(tx4.reserve()); + assert_pending!(r4.poll()); + + // Using one of the reserved slots should allow a new handle to become ready + permit1.send(1); + + // We also need to receive for the slot to be free + assert!(!r3.is_woken()); + rx.recv().await.unwrap(); + // Now there's a free slot! + assert!(r3.is_woken()); + assert!(!r4.is_woken()); + + // Dropping a permit should also open up a slot + drop(permit2); + assert!(r4.is_woken()); + + let mut r1 = tokio_test::task::spawn(tx1.reserve()); + assert_pending!(r1.poll()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn send_recv_stream_with_buffer() { + use tokio_stream::StreamExt; + + let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16); + let mut rx = Box::pin(rx); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn async_send_recv_with_buffer() { + let (tx, mut rx) = mpsc::channel(16); + + tokio::spawn(async move { + assert_ok!(tx.send(1).await); + assert_ok!(tx.send(2).await); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn start_send_past_cap() { + use std::future::Future; + + let mut t1 = tokio_test::task::spawn(()); + + let (tx1, mut rx) = mpsc::channel(1); + let tx2 = tx1.clone(); + + assert_ok!(tx1.try_send(())); + + let mut r1 = Box::pin(tx1.reserve()); + t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx))); + + { + let mut r2 = tokio_test::task::spawn(tx2.reserve()); + assert_pending!(r2.poll()); + + drop(r1); + + assert!(rx.recv().await.is_some()); + + assert!(r2.is_woken()); + assert!(!t1.is_woken()); + } + + drop(tx1); + drop(tx2); + + assert!(rx.recv().await.is_none()); +} + +#[test] +#[should_panic] +#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding +fn buffer_gteq_one() { + mpsc::channel::<i32>(0); +} + +#[maybe_tokio_test] +async fn send_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); + + // Using `try_send` + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(rx.recv().await, Some(1)); + assert_eq!(rx.recv().await, Some(2)); + + drop(tx); + + assert!(rx.recv().await.is_none()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn async_send_recv_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.recv().await); + assert_eq!(Some(2), rx.recv().await); + assert_eq!(None, rx.recv().await); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn send_recv_stream_unbounded() { + use tokio_stream::StreamExt; + + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>(); + + let mut rx = Box::pin(rx); + + tokio::spawn(async move { + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + }); + + assert_eq!(Some(1), rx.next().await); + assert_eq!(Some(2), rx.next().await); + assert_eq!(None, rx.next().await); +} + +#[maybe_tokio_test] +async fn no_t_bounds_buffer() { + struct NoImpls; + + let (tx, mut rx) = mpsc::channel(100); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().try_send(NoImpls).is_ok()); + + assert!(rx.recv().await.is_some()); +} + +#[maybe_tokio_test] +async fn no_t_bounds_unbounded() { + struct NoImpls; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + // sender should be Debug even though T isn't Debug + println!("{:?}", tx); + // same with Receiver + println!("{:?}", rx); + // and sender should be Clone even though T isn't Clone + assert!(tx.clone().send(NoImpls).is_ok()); + + assert!(rx.recv().await.is_some()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn send_recv_buffer_limited() { + let (tx, mut rx) = mpsc::channel::<i32>(1); + + // Reserve capacity + let p1 = assert_ok!(tx.reserve().await); + + // Send first message + p1.send(1); + + // Not ready + let mut p2 = tokio_test::task::spawn(tx.reserve()); + assert_pending!(p2.poll()); + + // Take the value + assert!(rx.recv().await.is_some()); + + // Notified + assert!(p2.is_woken()); + + // Trying to send fails + assert_err!(tx.try_send(1337)); + + // Send second + let permit = assert_ready_ok!(p2.poll()); + permit.send(2); + + assert!(rx.recv().await.is_some()); +} + +#[maybe_tokio_test] +async fn recv_close_gets_none_idle() { + let (tx, mut rx) = mpsc::channel::<i32>(10); + + rx.close(); + + assert!(rx.recv().await.is_none()); + + assert_err!(tx.send(1).await); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn recv_close_gets_none_reserved() { + let (tx1, mut rx) = mpsc::channel::<i32>(1); + let tx2 = tx1.clone(); + + let permit1 = assert_ok!(tx1.reserve().await); + let mut permit2 = tokio_test::task::spawn(tx2.reserve()); + assert_pending!(permit2.poll()); + + rx.close(); + + assert!(permit2.is_woken()); + assert_ready_err!(permit2.poll()); + + { + let mut recv = tokio_test::task::spawn(rx.recv()); + assert_pending!(recv.poll()); + + permit1.send(123); + assert!(recv.is_woken()); + + let v = assert_ready!(recv.poll()); + assert_eq!(v, Some(123)); + } + + assert!(rx.recv().await.is_none()); +} + +#[maybe_tokio_test] +async fn tx_close_gets_none() { + let (_, mut rx) = mpsc::channel::<i32>(10); + assert!(rx.recv().await.is_none()); +} + +#[maybe_tokio_test] +async fn try_send_fail() { + let (tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + assert_eq!(rx.recv().await, Some("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + assert_eq!(rx.recv().await, Some("goodbye")); + assert!(rx.recv().await.is_none()); +} + +#[maybe_tokio_test] +async fn try_send_fail_with_try_recv() { + let (tx, mut rx) = mpsc::channel(1); + + tx.try_send("hello").unwrap(); + + // This should fail + match assert_err!(tx.try_send("fail")) { + TrySendError::Full(..) => {} + _ => panic!(), + } + + assert_eq!(rx.try_recv(), Ok("hello")); + + assert_ok!(tx.try_send("goodbye")); + drop(tx); + + assert_eq!(rx.try_recv(), Ok("goodbye")); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); +} + +#[maybe_tokio_test] +async fn try_reserve_fails() { + let (tx, mut rx) = mpsc::channel(1); + + let permit = tx.try_reserve().unwrap(); + + // This should fail + match assert_err!(tx.try_reserve()) { + TrySendError::Full(()) => {} + _ => panic!(), + } + + permit.send("foo"); + + assert_eq!(rx.recv().await, Some("foo")); + + // Dropping permit releases the slot. + let permit = tx.try_reserve().unwrap(); + drop(permit); + + let _permit = tx.try_reserve().unwrap(); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn drop_permit_releases_permit() { + // poll_ready reserves capacity, ensure that the capacity is released if tx + // is dropped w/o sending a value. + let (tx1, _rx) = mpsc::channel::<i32>(1); + let tx2 = tx1.clone(); + + let permit = assert_ok!(tx1.reserve().await); + + let mut reserve2 = tokio_test::task::spawn(tx2.reserve()); + assert_pending!(reserve2.poll()); + + drop(permit); + + assert!(reserve2.is_woken()); + assert_ready_ok!(reserve2.poll()); +} + +#[maybe_tokio_test] +async fn dropping_rx_closes_channel() { + let (tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + assert_ok!(tx.try_send(msg.clone())); + + drop(rx); + assert_err!(tx.reserve().await); + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn dropping_rx_closes_channel_for_try() { + let (tx, rx) = mpsc::channel(100); + + let msg = Arc::new(()); + tx.try_send(msg.clone()).unwrap(); + + drop(rx); + + assert!(matches!( + tx.try_send(msg.clone()), + Err(TrySendError::Closed(_)) + )); + assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_)))); + assert!(matches!( + tx.try_reserve_owned(), + Err(TrySendError::Closed(_)) + )); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +fn unconsumed_messages_are_dropped() { + let msg = Arc::new(()); + + let (tx, rx) = mpsc::channel(100); + + tx.try_send(msg.clone()).unwrap(); + + assert_eq!(2, Arc::strong_count(&msg)); + + drop((tx, rx)); + + assert_eq!(1, Arc::strong_count(&msg)); +} + +#[test] +#[cfg(feature = "full")] +fn blocking_recv() { + let (tx, mut rx) = mpsc::channel::<u8>(1); + + let sync_code = std::thread::spawn(move || { + assert_eq!(Some(10), rx.blocking_recv()); + }); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let _ = tx.send(10).await; + }); + sync_code.join().unwrap() +} + +#[tokio::test] +#[should_panic] +#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding +async fn blocking_recv_async() { + let (_tx, mut rx) = mpsc::channel::<()>(1); + let _ = rx.blocking_recv(); +} + +#[test] +#[cfg(feature = "full")] +fn blocking_send() { + let (tx, mut rx) = mpsc::channel::<u8>(1); + + let sync_code = std::thread::spawn(move || { + tx.blocking_send(10).unwrap(); + }); + + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + assert_eq!(Some(10), rx.recv().await); + }); + sync_code.join().unwrap() +} + +#[tokio::test] +#[should_panic] +#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding +async fn blocking_send_async() { + let (tx, _rx) = mpsc::channel::<()>(1); + let _ = tx.blocking_send(()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn ready_close_cancel_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + let permit = assert_ok!(tx.reserve().await); + + rx.close(); + + let mut recv = tokio_test::task::spawn(rx.recv()); + assert_pending!(recv.poll()); + + drop(permit); + + assert!(recv.is_woken()); + let val = assert_ready!(recv.poll()); + assert!(val.is_none()); +} + +#[tokio::test] +#[cfg(feature = "full")] +async fn permit_available_not_acquired_close() { + let (tx1, mut rx) = mpsc::channel::<()>(1); + let tx2 = tx1.clone(); + + let permit1 = assert_ok!(tx1.reserve().await); + + let mut permit2 = tokio_test::task::spawn(tx2.reserve()); + assert_pending!(permit2.poll()); + + rx.close(); + + drop(permit1); + assert!(permit2.is_woken()); + + drop(permit2); + assert!(rx.recv().await.is_none()); +} + +#[test] +fn try_recv_bounded() { + let (tx, mut rx) = mpsc::channel(5); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert_eq!(Ok("hello"), rx.try_recv()); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + assert!(tx.try_send("hello").is_err()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + tx.try_send("hello").unwrap(); + drop(tx); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Ok("hello"), rx.try_recv()); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_unbounded() { + for num in 0..100 { + let (tx, mut rx) = mpsc::unbounded_channel(); + + for i in 0..num { + tx.send(i).unwrap(); + } + + for i in 0..num { + assert_eq!(rx.try_recv(), Ok(i)); + } + + assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); + drop(tx); + assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); + } +} + +#[test] +fn try_recv_close_while_empty_bounded() { + let (tx, mut rx) = mpsc::channel::<()>(5); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[test] +fn try_recv_close_while_empty_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + + assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); + drop(tx); + assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); +} + +#[tokio::test(start_paused = true)] +#[cfg(feature = "full")] +async fn recv_timeout() { + use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout}; + use tokio::time::Duration; + + let (tx, rx) = mpsc::channel(5); + + assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(())); + assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(())); + assert_eq!( + tx.send_timeout(60, Duration::from_secs(1)).await, + Err(Timeout(60)) + ); + + drop(rx); + assert_eq!( + tx.send_timeout(70, Duration::from_secs(1)).await, + Err(Closed(70)) + ); +} + +#[test] +#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"] +#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding +fn recv_timeout_panic() { + use futures::future::FutureExt; + use tokio::time::Duration; + + let (tx, _rx) = mpsc::channel(5); + tx.send_timeout(10, Duration::from_secs(1)).now_or_never(); +} |