#![allow(clippy::redundant_clone)] #![warn(rust_2018_idioms)] #![cfg(feature = "full")] use tokio::sync::mpsc; use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; use tokio_test::task; use tokio_test::{ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, }; use std::sync::Arc; trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} impl AssertSend for mpsc::Receiver {} #[test] fn send_recv_with_buffer() { let (tx, rx) = mpsc::channel::(16); let mut tx = task::spawn(tx); let mut rx = task::spawn(rx); // Using poll_ready / try_send assert_ready_ok!(tx.enter(|cx, mut tx| tx.poll_ready(cx))); tx.try_send(1).unwrap(); // Without poll_ready tx.try_send(2).unwrap(); drop(tx); let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(1)); let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert_eq!(val, Some(2)); let val = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))); assert!(val.is_none()); } #[test] fn disarm() { let (tx, rx) = mpsc::channel::(2); let mut tx1 = task::spawn(tx.clone()); let mut tx2 = task::spawn(tx.clone()); let mut tx3 = task::spawn(tx.clone()); let mut tx4 = task::spawn(tx); let mut rx = task::spawn(rx); // We should be able to `poll_ready` two handles without problem assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); assert_ready_ok!(tx2.enter(|cx, mut tx| tx.poll_ready(cx))); // But a third should not be ready assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); // Using one of the reserved slots should allow a new handle to become ready tx1.try_send(1).unwrap(); // We also need to receive for the slot to be free let _ = assert_ready!(rx.enter(|cx, mut rx| rx.poll_recv(cx))).unwrap(); // Now there's a free slot! assert_ready_ok!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); assert_pending!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); // Dropping a ready handle should also open up a slot drop(tx2); assert_ready_ok!(tx4.enter(|cx, mut tx| tx.poll_ready(cx))); assert_pending!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); // Explicitly disarming a handle should also open a slot assert!(tx3.disarm()); assert_ready_ok!(tx1.enter(|cx, mut tx| tx.poll_ready(cx))); // Disarming a non-armed sender does not free up a slot assert!(!tx3.disarm()); assert_pending!(tx3.enter(|cx, mut tx| tx.poll_ready(cx))); } #[tokio::test] async fn send_recv_stream_with_buffer() { use tokio::stream::StreamExt; let (mut tx, mut rx) = mpsc::channel::(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); assert_ok!(tx.send(2).await); }); assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } #[tokio::test] async fn async_send_recv_with_buffer() { let (mut tx, mut rx) = mpsc::channel(16); tokio::spawn(async move { assert_ok!(tx.send(1).await); assert_ok!(tx.send(2).await); }); assert_eq!(Some(1), rx.recv().await); assert_eq!(Some(2), rx.recv().await); assert_eq!(None, rx.recv().await); } #[test] fn start_send_past_cap() { let mut t1 = task::spawn(()); let mut t2 = task::spawn(()); let mut t3 = task::spawn(()); let (mut tx1, mut rx) = mpsc::channel(1); let mut tx2 = tx1.clone(); assert_ok!(tx1.try_send(())); t1.enter(|cx, _| { assert_pending!(tx1.poll_ready(cx)); }); t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_some()); assert!(t2.is_woken()); assert!(!t1.is_woken()); drop(tx2); let val = t3.enter(|cx, _| assert_ready!(rx.poll_recv(cx))); assert!(val.is_none()); } #[test] #[should_panic] fn buffer_gteq_one() { mpsc::channel::(0); } #[test] fn send_recv_unbounded() { let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::unbounded_channel::(); // Using `try_send` assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(1)); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some(2)); drop(tx); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_none()); } #[tokio::test] async fn async_send_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel(); tokio::spawn(async move { assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); }); assert_eq!(Some(1), rx.recv().await); assert_eq!(Some(2), rx.recv().await); assert_eq!(None, rx.recv().await); } #[tokio::test] async fn send_recv_stream_unbounded() { use tokio::stream::StreamExt; let (tx, mut rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { assert_ok!(tx.send(1)); assert_ok!(tx.send(2)); }); assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } #[test] fn no_t_bounds_buffer() { struct NoImpls; let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug println!("{:?}", tx); // same with Receiver println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); } #[test] fn no_t_bounds_unbounded() { struct NoImpls; let mut t1 = task::spawn(()); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug println!("{:?}", tx); // same with Receiver println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().send(NoImpls).is_ok()); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_some()); } #[test] fn send_recv_buffer_limited() { let mut t1 = task::spawn(()); let mut t2 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel::(1); // Run on a task context t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); // Send first message assert_ok!(tx.try_send(1)); // Not ready assert_pending!(tx.poll_ready(cx)); // Send second message assert_err!(tx.try_send(1337)); }); t2.enter(|cx, _| { // Take the value let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(1), val); }); assert!(t1.is_woken()); t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); assert_ok!(tx.try_send(2)); // Not ready assert_pending!(tx.poll_ready(cx)); }); t2.enter(|cx, _| { // Take the value let val = assert_ready!(rx.poll_recv(cx)); assert_eq!(Some(2), val); }); t1.enter(|cx, _| { assert_ready_ok!(tx.poll_ready(cx)); }); } #[test] fn recv_close_gets_none_idle() { let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel::(10); rx.close(); t1.enter(|cx, _| { let val = assert_ready!(rx.poll_recv(cx)); assert!(val.is_none()); assert_ready_err!(tx.poll_ready(cx)); }); } #[test] fn recv_close_gets_none_reserved() { let mut t1 = task::spawn(()); let mut t2 = task::spawn(()); let mut t3 = task::spawn(()); let (mut tx1, mut rx) = mpsc::channel::(1); let mut tx2 = tx1.clone(); assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); rx.close(); assert!(t2.is_woken()); t2.enter(|cx, _| { assert_ready_err!(tx2.poll_ready(cx)); }); t3.enter(|cx, _| assert_pending!(rx.poll_recv(cx))); assert!(!t1.is_woken()); assert!(!t2.is_woken()); assert_ok!(tx1.try_send(123)); assert!(t3.is_woken()); t3.enter(|cx, _| { let v = assert_ready!(rx.poll_recv(cx)); assert_eq!(v, Some(123)); let v = assert_ready!(rx.poll_recv(cx)); assert!(v.is_none()); }); } #[test] fn tx_close_gets_none() { let mut t1 = task::spawn(()); let (_, mut rx) = mpsc::channel::(10); // Run on a task context t1.enter(|cx, _| { let v = assert_ready!(rx.poll_recv(cx)); assert!(v.is_none()); }); } #[test] fn try_send_fail() { let mut t1 = task::spawn(()); let (mut tx, mut rx) = mpsc::channel(1); tx.try_send("hello").unwrap(); // This should fail match assert_err!(tx.try_send("fail")) { TrySendError::Full(..) => {} _ => panic!(), } let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("hello")); assert_ok!(tx.try_send("goodbye")); drop(tx); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert_eq!(val, Some("goodbye")); let val = assert_ready!(t1.enter(|cx, _| rx.poll_recv(cx))); assert!(val.is_none()); } #[test] fn drop_tx_with_permit_releases_permit() { let mut t1 = task::spawn(()); let mut t2 = task::spawn(()); // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. let (mut tx1, _rx) = mpsc::channel::(1); let mut tx2 = tx1.clone(); assert_ready_ok!(t1.enter(|cx, _| tx1.poll_ready(cx))); t2.enter(|cx, _| { assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); assert!(t2.is_woken()); assert_ready_ok!(t2.enter(|cx, _| tx2.poll_ready(cx))); } #[test] fn dropping_rx_closes_channel() { let mut t1 = task::spawn(()); let (mut tx, rx) = mpsc::channel(100); let msg = Arc::new(()); assert_ok!(tx.try_send(msg.clone())); drop(rx); assert_ready_err!(t1.enter(|cx, _| tx.poll_ready(cx))); assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn dropping_rx_closes_channel_for_try() { let (mut tx, rx) = mpsc::channel(100); let msg = Arc::new(()); tx.try_send(msg.clone()).unwrap(); drop(rx); { let err = assert_err!(tx.try_send(msg.clone())); match err { TrySendError::Closed(..) => {} _ => panic!(), } } assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn unconsumed_messages_are_dropped() { let msg = Arc::new(()); let (mut tx, rx) = mpsc::channel(100); tx.try_send(msg.clone()).unwrap(); assert_eq!(2, Arc::strong_count(&msg)); drop((tx, rx)); assert_eq!(1, Arc::strong_count(&msg)); } #[test] fn try_recv() { let (mut tx, mut rx) = mpsc::channel(1); match rx.try_recv() { Err(TryRecvError::Empty) => {} _ => panic!(), } tx.try_send(42).unwrap(); match rx.try_recv() { Ok(42) => {} _ => panic!(), } drop(tx); match rx.try_recv() { Err(TryRecvError::Closed) => {} _ => panic!(), } } #[test] fn try_recv_unbounded() { let (tx, mut rx) = mpsc::unbounded_channel(); match rx.try_recv() { Err(TryRecvError::Empty) => {} _ => panic!(), } tx.send(42).unwrap(); match rx.try_recv() { Ok(42) => {} _ => panic!(), } drop(tx); match rx.try_recv() { Err(TryRecvError::Closed) => {} _ => panic!(), } }