summaryrefslogtreecommitdiffstats
path: root/third_party/rust/tokio/tests/sync_oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/rust/tokio/tests/sync_oneshot.rs')
-rw-r--r--third_party/rust/tokio/tests/sync_oneshot.rs279
1 files changed, 279 insertions, 0 deletions
diff --git a/third_party/rust/tokio/tests/sync_oneshot.rs b/third_party/rust/tokio/tests/sync_oneshot.rs
new file mode 100644
index 0000000000..44d09e8ade
--- /dev/null
+++ b/third_party/rust/tokio/tests/sync_oneshot.rs
@@ -0,0 +1,279 @@
+#![warn(rust_2018_idioms)]
+#![cfg(feature = "sync")]
+
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen_test::wasm_bindgen_test as test;
+#[cfg(target_arch = "wasm32")]
+use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
+
+#[cfg(not(target_arch = "wasm32"))]
+use tokio::test as maybe_tokio_test;
+
+use tokio::sync::oneshot;
+use tokio::sync::oneshot::error::TryRecvError;
+use tokio_test::*;
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+trait AssertSend: Send {}
+impl AssertSend for oneshot::Sender<i32> {}
+impl AssertSend for oneshot::Receiver<i32> {}
+
+trait SenderExt {
+ fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()>;
+}
+impl<T> SenderExt for oneshot::Sender<T> {
+ fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
+ tokio::pin! {
+ let fut = self.closed();
+ }
+ fut.poll(cx)
+ }
+}
+
+#[test]
+fn send_recv() {
+ let (tx, rx) = oneshot::channel();
+ let mut rx = task::spawn(rx);
+
+ assert_pending!(rx.poll());
+
+ assert_ok!(tx.send(1));
+
+ assert!(rx.is_woken());
+
+ let val = assert_ready_ok!(rx.poll());
+ assert_eq!(val, 1);
+}
+
+#[maybe_tokio_test]
+async fn async_send_recv() {
+ let (tx, rx) = oneshot::channel();
+
+ assert_ok!(tx.send(1));
+ assert_eq!(1, assert_ok!(rx.await));
+}
+
+#[test]
+fn close_tx() {
+ let (tx, rx) = oneshot::channel::<i32>();
+ let mut rx = task::spawn(rx);
+
+ assert_pending!(rx.poll());
+
+ drop(tx);
+
+ assert!(rx.is_woken());
+ assert_ready_err!(rx.poll());
+}
+
+#[test]
+fn close_rx() {
+ // First, without checking poll_closed()
+ //
+ let (tx, _) = oneshot::channel();
+
+ assert_err!(tx.send(1));
+
+ // Second, via poll_closed();
+
+ let (tx, rx) = oneshot::channel();
+ let mut tx = task::spawn(tx);
+
+ assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ drop(rx);
+
+ assert!(tx.is_woken());
+ assert!(tx.is_closed());
+ assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ assert_err!(tx.into_inner().send(1));
+}
+
+#[tokio::test]
+#[cfg(feature = "full")]
+async fn async_rx_closed() {
+ let (mut tx, rx) = oneshot::channel::<()>();
+
+ tokio::spawn(async move {
+ drop(rx);
+ });
+
+ tx.closed().await;
+}
+
+#[test]
+fn explicit_close_poll() {
+ // First, with message sent
+ let (tx, rx) = oneshot::channel();
+ let mut rx = task::spawn(rx);
+
+ assert_ok!(tx.send(1));
+
+ rx.close();
+
+ let value = assert_ready_ok!(rx.poll());
+ assert_eq!(value, 1);
+
+ // Second, without the message sent
+ let (tx, rx) = oneshot::channel::<i32>();
+ let mut tx = task::spawn(tx);
+ let mut rx = task::spawn(rx);
+
+ assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ rx.close();
+
+ assert!(tx.is_woken());
+ assert!(tx.is_closed());
+ assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ assert_err!(tx.into_inner().send(1));
+ assert_ready_err!(rx.poll());
+
+ // Again, but without sending the value this time
+ let (tx, rx) = oneshot::channel::<i32>();
+ let mut tx = task::spawn(tx);
+ let mut rx = task::spawn(rx);
+
+ assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ rx.close();
+
+ assert!(tx.is_woken());
+ assert!(tx.is_closed());
+ assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ assert_ready_err!(rx.poll());
+}
+
+#[test]
+fn explicit_close_try_recv() {
+ // First, with message sent
+ let (tx, mut rx) = oneshot::channel();
+
+ assert_ok!(tx.send(1));
+
+ rx.close();
+
+ let val = assert_ok!(rx.try_recv());
+ assert_eq!(1, val);
+
+ // Second, without the message sent
+ let (tx, mut rx) = oneshot::channel::<i32>();
+ let mut tx = task::spawn(tx);
+
+ assert_pending!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ rx.close();
+
+ assert!(tx.is_woken());
+ assert!(tx.is_closed());
+ assert_ready!(tx.enter(|cx, mut tx| tx.poll_closed(cx)));
+
+ assert_err!(rx.try_recv());
+}
+
+#[test]
+#[should_panic]
+#[cfg(not(target_arch = "wasm32"))] // wasm currently doesn't support unwinding
+fn close_try_recv_poll() {
+ let (_tx, rx) = oneshot::channel::<i32>();
+ let mut rx = task::spawn(rx);
+
+ rx.close();
+
+ assert_err!(rx.try_recv());
+
+ let _ = rx.poll();
+}
+
+#[test]
+fn close_after_recv() {
+ let (tx, mut rx) = oneshot::channel::<i32>();
+
+ tx.send(17).unwrap();
+
+ assert_eq!(17, rx.try_recv().unwrap());
+ rx.close();
+}
+
+#[test]
+fn try_recv_after_completion() {
+ let (tx, mut rx) = oneshot::channel::<i32>();
+
+ tx.send(17).unwrap();
+
+ assert_eq!(17, rx.try_recv().unwrap());
+ assert_eq!(Err(TryRecvError::Closed), rx.try_recv());
+ rx.close();
+}
+
+#[test]
+fn drops_tasks() {
+ let (mut tx, mut rx) = oneshot::channel::<i32>();
+ let mut tx_task = task::spawn(());
+ let mut rx_task = task::spawn(());
+
+ assert_pending!(tx_task.enter(|cx, _| tx.poll_closed(cx)));
+ assert_pending!(rx_task.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
+
+ drop(tx);
+ drop(rx);
+
+ assert_eq!(1, tx_task.waker_ref_count());
+ assert_eq!(1, rx_task.waker_ref_count());
+}
+
+#[test]
+fn receiver_changes_task() {
+ let (tx, mut rx) = oneshot::channel();
+
+ let mut task1 = task::spawn(());
+ let mut task2 = task::spawn(());
+
+ assert_pending!(task1.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
+
+ assert_eq!(2, task1.waker_ref_count());
+ assert_eq!(1, task2.waker_ref_count());
+
+ assert_pending!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
+
+ assert_eq!(1, task1.waker_ref_count());
+ assert_eq!(2, task2.waker_ref_count());
+
+ assert_ok!(tx.send(1));
+
+ assert!(!task1.is_woken());
+ assert!(task2.is_woken());
+
+ assert_ready_ok!(task2.enter(|cx, _| Pin::new(&mut rx).poll(cx)));
+}
+
+#[test]
+fn sender_changes_task() {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ let mut task1 = task::spawn(());
+ let mut task2 = task::spawn(());
+
+ assert_pending!(task1.enter(|cx, _| tx.poll_closed(cx)));
+
+ assert_eq!(2, task1.waker_ref_count());
+ assert_eq!(1, task2.waker_ref_count());
+
+ assert_pending!(task2.enter(|cx, _| tx.poll_closed(cx)));
+
+ assert_eq!(1, task1.waker_ref_count());
+ assert_eq!(2, task2.waker_ref_count());
+
+ drop(rx);
+
+ assert!(!task1.is_woken());
+ assert!(task2.is_woken());
+
+ assert_ready!(task2.enter(|cx, _| tx.poll_closed(cx)));
+}