summaryrefslogtreecommitdiffstats
path: root/vendor/tokio/src/sync/tests/loom_oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tokio/src/sync/tests/loom_oneshot.rs')
-rw-r--r--vendor/tokio/src/sync/tests/loom_oneshot.rs140
1 files changed, 140 insertions, 0 deletions
diff --git a/vendor/tokio/src/sync/tests/loom_oneshot.rs b/vendor/tokio/src/sync/tests/loom_oneshot.rs
new file mode 100644
index 000000000..c5f797207
--- /dev/null
+++ b/vendor/tokio/src/sync/tests/loom_oneshot.rs
@@ -0,0 +1,140 @@
+use crate::sync::oneshot;
+
+use futures::future::poll_fn;
+use loom::future::block_on;
+use loom::thread;
+use std::task::Poll::{Pending, Ready};
+
+#[test]
+fn smoke() {
+ loom::model(|| {
+ let (tx, rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ });
+}
+
+#[test]
+fn changing_rx_task() {
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ tx.send(1).unwrap();
+ });
+
+ let rx = thread::spawn(move || {
+ let ready = block_on(poll_fn(|cx| match Pin::new(&mut rx).poll(cx) {
+ Ready(Ok(value)) => {
+ assert_eq!(1, value);
+ Ready(true)
+ }
+ Ready(Err(_)) => unimplemented!(),
+ Pending => Ready(false),
+ }));
+
+ if ready {
+ None
+ } else {
+ Some(rx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(rx) = rx {
+ // Previous task parked, use a new task...
+ let value = block_on(rx).unwrap();
+ assert_eq!(1, value);
+ }
+ });
+}
+
+#[test]
+fn try_recv_close() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+ thread::spawn(move || {
+ let _ = tx.send(());
+ });
+
+ rx.close();
+ let _ = rx.try_recv();
+ })
+}
+
+#[test]
+fn recv_closed() {
+ // reproduces https://github.com/tokio-rs/tokio/issues/4225
+ loom::model(|| {
+ let (tx, mut rx) = oneshot::channel();
+
+ thread::spawn(move || {
+ let _ = tx.send(1);
+ });
+
+ rx.close();
+ let _ = block_on(rx);
+ });
+}
+
+// TODO: Move this into `oneshot` proper.
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+struct OnClose<'a> {
+ tx: &'a mut oneshot::Sender<i32>,
+}
+
+impl<'a> OnClose<'a> {
+ fn new(tx: &'a mut oneshot::Sender<i32>) -> Self {
+ OnClose { tx }
+ }
+}
+
+impl Future for OnClose<'_> {
+ type Output = bool;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
+ let fut = self.get_mut().tx.closed();
+ crate::pin!(fut);
+
+ Ready(fut.poll(cx).is_ready())
+ }
+}
+
+#[test]
+fn changing_tx_task() {
+ loom::model(|| {
+ let (mut tx, rx) = oneshot::channel::<i32>();
+
+ thread::spawn(move || {
+ drop(rx);
+ });
+
+ let tx = thread::spawn(move || {
+ let t1 = block_on(OnClose::new(&mut tx));
+
+ if t1 {
+ None
+ } else {
+ Some(tx)
+ }
+ })
+ .join()
+ .unwrap();
+
+ if let Some(mut tx) = tx {
+ // Previous task parked, use a new task...
+ block_on(OnClose::new(&mut tx));
+ }
+ });
+}